source: trunk/ccl/level-1/l1-processes.lisp @ 7330

Last change on this file since 7330 was 7330, checked in by wws, 13 years ago

(join-process (p process) &key default)

Waits for process p to quit. Returns the values returned by it initial
function if it quits normally. If killed, returns default if
specified, or signals an error otherwise.
Patterned after SBCL's join-thread function.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 22.9 KB
Line 
1;;;-*- Mode: Lisp; Package: CCL -*-
2;;;
3;;;   Copyright (C) 1994-2001 Digitool, Inc
4;;;   This file is part of OpenMCL. 
5;;;
6;;;   OpenMCL is licensed under the terms of the Lisp Lesser GNU Public
7;;;   License , known as the LLGPL and distributed with OpenMCL as the
8;;;   file "LICENSE".  The LLGPL consists of a preamble and the LGPL,
9;;;   which is distributed with OpenMCL as the file "LGPL".  Where these
10;;;   conflict, the preamble takes precedence. 
11;;;
12;;;   OpenMCL is referenced in the preamble as the "LIBRARY."
13;;;
14;;;   The LLGPL is also available online at
15;;;   http://opensource.franz.com/preamble.html
16
17
18;; L1-processes.lisp
19
20(cl:in-package "CCL")
21
22
23(let* ((all-processes ())
24       (shutdown-processes ())
25       (all-processes-lock (make-lock)))
26  (defun add-to-all-processes (p)
27    (with-lock-grabbed (all-processes-lock)
28      (pushnew p all-processes :test #'eq)
29      p))
30  (defun remove-from-all-processes (p)
31    (with-lock-grabbed (all-processes-lock)
32      (setq all-processes (delete p all-processes))
33      t))
34  (defun all-processes ()
35    "Obtain a fresh list of all known Lisp threads."
36    (with-lock-grabbed (all-processes-lock)
37      (copy-list all-processes)))
38  (defun shutdown-processes ()
39    (with-lock-grabbed (all-processes-lock)
40      (copy-list shutdown-processes)))
41  (defun %clear-shutdown-proceses ()
42    (setq shutdown-processes nil))
43  (defun add-to-shutdown-processes (p)
44    (with-lock-grabbed (all-processes-lock)
45      (pushnew p shutdown-processes :test #'eq))
46    t)
47  (defun pop-shutdown-processes ()
48    (with-lock-grabbed (all-processes-lock)
49      (pop shutdown-processes)))
50  (defun find-process (id)
51    (etypecase id
52      (process id)
53      (integer (with-lock-grabbed (all-processes-lock)
54                 (find id all-processes
55                       :key #'(lambda (p)
56                                (process-serial-number p)))))
57      (string (with-lock-grabbed (all-processes-lock)
58                (find id all-processes
59                      :key #'(lambda (p)
60                               (process-name p))
61                      :test #'equal))))))
62
63
64
65(defun not-in-current-process (p operation)
66  (if (eq p *current-process*)
67    (error "The current process (~s) can't perform the ~a operation on itself."
68           p operation)))
69
70(defun startup-shutdown-processes ()
71  (let* ((p))
72    (loop
73      (unless (setq p (pop-shutdown-processes)) (return))
74      (new-tcr-for-thread (process-thread p))
75      (%process-preset-internal p)
76      (process-enable p)
77      )))
78
79;;; Done with a queue-fixup so that it will be the last thing
80;;; that happens on startup.
81(queue-fixup
82 (pushnew 'startup-shutdown-processes *lisp-system-pointer-functions*))
83
84
85
86
87
88
89
90(defun wrap-initial-bindings (alist)
91  (mapcar #'(lambda (pair)
92              (destructuring-bind (symbol . valform) pair
93                (cons (require-type symbol 'symbol)
94                      (cond ((symbolp valform)
95                             (constantly (symbol-value valform)))
96                            ((typep valform 'function) valform)
97                            ((consp valform)
98                             (if (eq (car valform) 'quote)
99                               (constantly (cadr valform))
100                               #'(lambda () (apply (car valform) (cdr valform)))))
101                            (t
102                             (constantly valform))))))
103          alist))
104
105
106(defun valid-allocation-quantum-p (x)
107  (and (>= x *host-page-size*)
108       (<= x (default-allocation-quantum))
109       (= (logcount x) 1)))
110
111 
112(let* ((psn -1))
113  (defun %new-psn () (incf psn)))
114
115(defclass process ()
116    ((name :initform nil :initarg :name :accessor process-name)
117     (thread :initarg :thread :accessor process-thread)
118     (initial-form :initform (cons nil nil) :reader process-initial-form)
119     (priority :initform 0 :initarg :priority :accessor process-priority)
120     (persistent :initform nil :initarg :persistent :reader process-persistent)
121     (whostate :initform "Reset" :accessor %process-whostate)
122     (splice :initform (cons nil nil) :accessor process-splice)
123     (initial-bindings :initform nil :initarg :initial-bindings
124                       :accessor process-initial-bindings)
125     (serial-number :initform (%new-psn) :accessor process-serial-number)
126     (creation-time :initform (get-tick-count) :reader process-creation-time)
127     (total-run-time :initform nil :accessor %process-total-run-time)
128     (ui-object :initform (application-ui-object *application*)
129                :accessor process-ui-object)
130     (termination-semaphore :initform nil
131                            :initarg :termination-semaphore
132                            :accessor process-termination-semaphore
133                            :type (or null semaphore))
134     (allocation-quantum :initform (default-allocation-quantum)
135                         :initarg :allocation-quantum
136                         :reader process-allocation-quantum
137                         :type (satisfies valid-allocation-quantum-p))
138     (dribble-stream :initform nil)
139     (dribble-saved-terminal-io :initform nil)
140     (result :initform (cons nil nil)
141             :reader process-result))
142  (:primary-p t))
143
144(defmethod print-object ((p process) s)
145  (print-unreadable-object (p s :type t :identity t)
146    (format s "~a(~d) [~a]" (process-name p)
147            (process-serial-number p)(process-whostate p))))
148
149(defvar *process-class* (find-class 'process))
150
151(defun processp (p)
152  (memq *process-class* (class-precedence-list (class-of p))))
153
154(set-type-predicate 'process 'processp)
155
156(defun make-process (name &key 
157                          thread
158                          persistent
159                          (priority 0)
160                          (stack-size *default-control-stack-size*)
161                          (vstack-size *default-value-stack-size*)
162                          (tstack-size *default-temp-stack-size*)
163                          (initial-bindings ())
164                          (use-standard-initial-bindings t)
165                          (class (find-class 'process))
166                          (termination-semaphore ())
167                          (allocation-quantum (default-allocation-quantum)))
168  "Create and return a new process."
169  (declare (ignore flavor))
170  (let* ((p (make-instance
171             class
172             :name name
173             :thread (or thread
174                         (new-thread name stack-size  vstack-size  tstack-size))
175             :priority priority
176             :persistent persistent
177             :initial-bindings (append (if use-standard-initial-bindings
178                                         (standard-initial-bindings))
179                                       (wrap-initial-bindings
180                                        initial-bindings))
181             :termination-semaphore (or termination-semaphore
182                                        (make-semaphore))
183             :allocation-quantum allocation-quantum)))
184    (add-to-all-processes p)
185    (setf (car (process-splice p)) p)
186    p))
187
188
189(defstatic *initial-process*
190    (let* ((p (make-process
191               "Initial"
192               :thread *initial-lisp-thread*
193               :priority 0)))
194      (setf (%process-whostate p) "Active")
195      p))
196
197
198(defvar *current-process* *initial-process*
199  "Bound in each process, to that process itself.")
200
201(defstatic *interactive-abort-process* *initial-process*)
202
203
204
205
206(defun process-tcr (p)
207  (lisp-thread.tcr (process-thread p)))
208
209
210
211(defun process-exhausted-p (p)
212  (let* ((thread (process-thread p)))
213    (or (null thread)
214        (thread-exhausted-p thread))))
215 
216
217(defun process-whostate (p)
218  "Return a string which describes the status of a specified process."
219  (if (process-exhausted-p p)
220    "Exhausted"
221    (%process-whostate p)))
222
223
224
225
226
227(defun process-total-run-time (p)
228  (or (%process-total-run-time p)
229      (thread-total-run-time (process-thread p))))
230
231
232
233
234(defun initial-bindings (alist)
235  (let* ((symbols ())
236         (values ()))
237    (dolist (a alist (values (nreverse symbols) (nreverse values)))
238      (push (car a) symbols)
239      (push (funcall (cdr a)) values))))
240
241
242                           
243(defun symbol-value-in-process (sym process)
244  (symbol-value-in-tcr sym (process-tcr process)))
245
246(defun (setf symbol-value-in-process) (value sym process)
247  (setf (symbol-value-in-tcr sym (process-tcr process)) value))
248
249
250(defun process-enable (p &optional (wait 1))
251  "Begin executing the initial function of a specified process."
252  (setq p (require-type p 'process))
253  (not-in-current-process p 'process-enable)
254  (unless (car (process-initial-form p))
255    (error "Process ~s has not been preset.  Use PROCESS-PRESET to preset the process." p))
256  (let* ((thread (process-thread p)))
257    (do* ((total-wait wait (+ total-wait wait)))
258         ((thread-enable thread (process-termination-semaphore p) (1- (integer-length (process-allocation-quantum p)))  wait)
259          (setf (%process-whostate p) "Active")
260          p)
261      (cerror "Keep trying."
262              "Unable to enable process ~s; have been trying for ~s seconds."
263              p total-wait))))
264
265
266(defmethod (setf process-termination-semaphore) :after (new (p process))
267  (with-macptrs (tcrp)
268    (%setf-macptr-to-object tcrp (process-tcr p))
269    (unless (%null-ptr-p tcrp)
270      (setf (%get-ptr tcrp target::tcr.termination-semaphore)
271            (if new
272              (semaphore-value new)
273              (%null-ptr))))
274    new))
275
276(defun process-resume (p)
277  "Resume a specified process which had previously been suspended
278by process-suspend."
279  (setq p (require-type p 'process))
280  (%resume-tcr (process-tcr p)))
281
282(defun process-suspend (p)
283  "Suspend a specified process."
284  (setq p (require-type p 'process))
285  (if (eq p *current-process*)
286    (error "Suspending the current process can't work.  ~&(If the documentation claims otherwise, it's incorrect.)")
287    (%suspend-tcr (process-tcr p))))
288
289(defun process-suspend-count (p)
290  "Return the number of currently-pending suspensions applicable to
291a given process."
292  (setq p (require-type p 'process))
293  (let* ((thread (process-thread p)))
294    (if thread
295      (lisp-thread-suspend-count thread))))
296
297(defun process-active-p (p)
298  (setq p (require-type p 'process))
299  (and (eql 0 (process-suspend-count p))
300       (not (process-exhausted-p p))))
301 
302;;; Used by process-run-function
303(defun process-preset (process function &rest args)
304  "Set the initial function and arguments of a specified process."
305  (let* ((p (require-type process 'process))
306         (f (require-type function 'function))
307         (initial-form (process-initial-form p)))
308    (declare (type cons initial-form))
309    (not-in-current-process p 'process-preset)
310    ; Not quite right ...
311    (rplaca initial-form f)
312    (rplacd initial-form args)
313    (%process-preset-internal process)))
314
315(defun %process-preset-internal (process)
316   (let* ((initial-form (process-initial-form process))
317         (thread (process-thread process)))
318     (declare (type cons initial-form))
319     (thread-preset
320      thread
321      #'(lambda (process initial-form)
322          (let* ((*current-process* process))
323            (add-to-all-processes process)
324            (multiple-value-bind (syms values)
325                (initial-bindings (process-initial-bindings process))
326              (progv syms values
327                (run-process-initial-form process initial-form)))))
328      process
329      initial-form)
330     process))
331
332
333(defun run-process-initial-form (process initial-form)
334  (let* ((exited nil)
335         (kill (handler-case
336                   (restart-case
337                    (let ((values
338                           (multiple-value-list
339                            (apply (car initial-form)
340                                   (cdr (the list initial-form)))))
341                          (result (process-result process)))
342                      (setf (cdr result) values
343                            (car result) t)
344                      (setq exited t)
345                      nil)
346                    (abort-break () :report "Reset this process")
347                    (abort () :report "Kill this process" (setq exited t)))
348                 (process-reset (condition)
349                   (process-reset-kill condition)))))
350    ;; We either exited from the initial form normally, were told to
351    ;; exit prematurely, or are being reset and should enter the
352    ;; "awaiting preset" state.
353    (if (or kill exited) 
354      (unless (eq kill :toplevel)
355        (process-initial-form-exited process (or kill t)))
356      (progn
357        (thread-change-state (process-thread process) :run :reset)
358        (tcr-set-preset-state (process-tcr process))
359        (setf (%process-whostate process) "Reset")))
360    nil))
361
362;;; Separated from run-process-initial-form just so I can change it easily.
363(defun process-initial-form-exited (process kill)
364  ;; Enter the *initial-process* and have it finish us up
365  (without-interrupts
366   (if (eq kill :shutdown)
367     (progn
368       (setf (%process-whostate process) "Shutdown")
369       (add-to-shutdown-processes process)))
370   (maybe-finish-process-kill process kill)))
371
372(defun maybe-finish-process-kill (process kill)
373  (when (and kill (neq kill :shutdown))
374    (setf (%process-whostate process) "Dead")
375    (remove-from-all-processes process)
376    (let ((thread (process-thread process)))
377      (unless (or (eq thread *current-lisp-thread*)
378                  (thread-exhausted-p thread))
379        (kill-lisp-thread thread))))
380  nil)
381
382
383 
384
385(defun require-global-symbol (s &optional env)
386  (let* ((s (require-type s 'symbol))
387         (bits (%symbol-bits s)))
388    (unless (or (logbitp $sym_vbit_global bits)
389                (let* ((defenv (if env (definition-environment env))))
390                  (if defenv
391                    (eq :global (%cdr (assq s (defenv.specials defenv)))))))
392      (error "~s not defined with ~s" s 'defstatic))
393    s))
394
395
396(defmethod print-object ((s lock) stream)
397  (print-unreadable-object (s stream :type t :identity t)
398    (let* ((val (uvref s target::lock._value-cell))
399           (name (uvref s target::lock.name-cell)))
400      (when name
401        (format stream "~s " name))
402      (if (typep val 'macptr)
403        (format stream "[ptr @ #x~x]"
404                (%ptr-to-int val))))))
405
406(defun lockp (l)
407  (eq target::subtag-lock (typecode l)))
408
409(set-type-predicate 'lock 'lockp)
410
411(defun recursive-lock-p (l)
412  (and (eq target::subtag-lock (typecode l))
413       (eq 'recursive-lock (%svref l target::lock.kind-cell))))
414
415(defun read-write-lock-p (l)
416  (and (eq target::subtag-lock (typecode l))
417       (eq 'read-write-lock (%svref l target::lock.kind-cell))))
418
419(setf (type-predicate 'recursive-lock) 'recursive-lock-p
420      (type-predicate 'read-write-lock) 'read-write-lock-p)
421
422
423(defun grab-lock (lock &optional flag)
424  "Wait until a given lock can be obtained, then obtain it."
425  (%lock-recursive-lock (recursive-lock-ptr lock) flag))
426
427(defun release-lock (lock)
428  "Relinquish ownership of a given lock."
429  (%unlock-recursive-lock (recursive-lock-ptr lock)))
430
431(defun try-lock (lock &optional flag)
432  "Obtain the given lock, but only if it is not necessary to wait for it."
433  (%try-recursive-lock (recursive-lock-ptr lock) flag))
434
435(defun lock-acquisition-status (thing)
436  (if (istruct-typep thing 'lock-acquisition)
437    (lock-acquisition.status thing)
438    (report-bad-arg thing 'lock-acquisition)))
439
440(defun clear-lock-acquisition-status (thing)
441  (if (istruct-typep thing 'lock-acquisition)
442    (setf (lock-acquisition.status thing) nil)
443    (report-bad-arg thing 'lock-acquisition)))
444
445(defmethod print-object ((l lock-acquisition) stream)
446  (print-unreadable-object (l stream :type t :identity t)
447    (format stream "[status = ~s]" (lock-acquisition-status l))))
448
449(defun semaphore-notification-status (thing)
450  (if (istruct-typep thing 'semaphore-notification)
451    (semaphore-notification.status thing)
452    (report-bad-arg thing 'semaphore-notification)))
453
454(defun clear-semaphore-notification-status (thing)
455  (if (istruct-typep thing 'semaphore-notification)
456    (setf (semaphore-notification.status thing) nil)
457    (report-bad-arg thing 'semaphore-notification)))
458
459(defmethod print-object ((l semaphore-notification) stream)
460  (print-unreadable-object (l stream :type t :identity t)
461    (format stream "[status = ~s]" (semaphore-notification-status l))))
462
463(defun process-wait (whostate function &rest args)
464  "Causes the current lisp process (thread) to wait for a given
465predicate to return true."
466  (declare (dynamic-extent args))
467  (or (apply function args)
468      (with-process-whostate (whostate)
469        (loop
470          (when (apply function args)
471            (return))
472          ;; Sleep for a tick
473          (%nanosleep 0 *ns-per-tick*)))))
474
475
476
477(defun process-wait-with-timeout (whostate time function &rest args)
478  "Cause the current thread to wait for a given predicate to return true,
479or for a timeout to expire."
480  (declare (dynamic-extent args))
481  (cond ((null time)  (apply #'process-wait whostate function args) t)
482        (t (let* ((win nil)
483                  (when (+ (get-tick-count) time))
484                  (f #'(lambda () (let ((val (apply function args)))
485                                    (if val
486                                      (setq win val)
487                                      (> (get-tick-count) when))))))
488             (declare (dynamic-extent f))
489             (process-wait whostate f)
490             win))))
491
492
493(defmethod process-interrupt ((process process) function &rest args)
494  "Arrange for the target process to invoke a specified function at
495some point in the near future, and then return to what it was doing."
496  (let* ((p (require-type process 'process)))
497    (if (eq p *current-process*)
498      (progn
499        (apply function args)
500        t)
501      (thread-interrupt
502       (process-thread p)
503       process
504       #'apply
505       function args))))
506
507(defmethod process-debug-condition ((p process) condition frame-pointer)
508  (declare (ignore condition frame-pointer)))
509
510
511
512
513;;; This one is in the Symbolics documentation
514(defun process-allow-schedule ()
515  "Used for cooperative multitasking; probably never necessary."
516  (yield))
517
518
519;;; something unique that users won't get their hands on
520(defun process-reset-tag (process)
521  (process-splice process))
522
523(defun process-run-function (name-or-keywords function &rest args)
524  "Create a process, preset it, and enable it."
525  (if (listp name-or-keywords)
526    (%process-run-function name-or-keywords function args)
527    (let ((keywords (list :name name-or-keywords)))
528      (declare (dynamic-extent keywords))
529      (%process-run-function keywords function args))))
530
531(defun %process-run-function (keywords function args)
532  (destructuring-bind (&key (name "Anonymous")
533                            (priority  0)
534                            (stack-size *default-control-stack-size*)
535                            (vstack-size *default-value-stack-size*)
536                            (tstack-size *default-temp-stack-size*)
537                            (initial-bindings ())
538                            (persistent nil)
539                            (use-standard-initial-bindings t)
540                            (termination-semaphore nil)
541                            (allocation-quantum (default-allocation-quantum)))
542                      keywords
543    (setq priority (require-type priority 'fixnum))
544    (let* ((process (make-process name
545                                  :priority priority
546                                  :stack-size stack-size
547                                  :vstack-size vstack-size
548                                  :tstack-size tstack-size
549                                  :persistent persistent
550                                  :use-standard-initial-bindings use-standard-initial-bindings
551                                  :initial-bindings initial-bindings
552                                  :termination-semaphore termination-semaphore
553                                  :allocation-quantum allocation-quantum)))
554      (process-preset process #'(lambda () (apply function args)))
555      (process-enable process)
556      process)))
557
558(defmethod process-reset ((process process) &optional kill)
559  "Cause a specified process to cleanly exit from any ongoing computation."
560  (setq process (require-type process 'process))
561  (unless (memq kill '(nil :kill :shutdown))
562    (setq kill (require-type kill '(member nil :kill :shutdown))))
563  (if (eq process *current-process*)
564    (%process-reset kill)
565    (if (process-exhausted-p process)
566      (maybe-finish-process-kill process kill)
567      (progn
568        (process-interrupt process '%process-reset kill)))))
569
570
571(defun %process-reset (kill)
572  (signal 'process-reset :kill kill)
573  (maybe-finish-process-kill *current-process* kill))
574
575;;; By default, it's just fine with the current process
576;;; if the application/user wants to quit.
577(defmethod process-verify-quit ((process process))
578  t)
579
580(defmethod process-exit-application ((process process) thunk)
581  (when (eq process *initial-process*)
582    (prepare-to-quit)
583    (%set-toplevel thunk)
584    (fresh-line *stdout*)
585    (finish-output *stdout*)
586    (toplevel)))
587
588
589
590(defmethod process-kill ((process process))
591  "Cause a specified process to cleanly exit from any ongoing
592computation, and then exit."
593  (and (process-interrupt process #'%process-reset :kill)
594       (setf (process-kill-issued process) t)))
595
596(defun process-abort (process &optional condition)
597  "Cause a specified process to process an abort condition, as if it
598had invoked abort."
599  (process-interrupt process
600                     #'(lambda ()
601                         (abort condition))))
602
603(defmethod process-reset-and-enable ((process process))
604  (not-in-current-process process 'process-reset-and-enable)
605  (process-reset process)
606  (process-enable process))
607
608(defmethod process-kill-issued ((process process))
609  (cdr (process-splice process)))
610
611(defmethod (setf process-kill-issued) (val (process process))
612  (setf (cdr (process-splice process)) val))
613
614(defun tcr->process (tcr)
615  (dolist (p (all-processes))
616    (when (eq tcr (process-tcr p))
617      (return p))))
618
619(defun current-process-allocation-quantum ()
620  (process-allocation-quantum *current-process*))
621
622(defun (setf current-process-allocation-quantum) (new)
623  (if (valid-allocation-quantum-p new)
624    (with-macptrs (tcrp)
625      (%setf-macptr-to-object tcrp (%current-tcr))
626      (setf (slot-value *current-process* 'allocation-quantum) new
627            (%get-natural tcrp target::tcr.log2-allocation-quantum)
628            (1- (integer-length new)))
629      new)
630    (report-bad-arg new '(satisfies valid-allocation-quantum-p))))
631
632
633(def-standard-initial-binding *backtrace-contexts* nil)
634
635(defmethod exit-interactive-process ((p process))
636  (unless (eq p *initial-process*)
637    (when (eq p *current-process*)
638      (process-kill p))))
639
640(defclass tty-listener (process)
641    ())
642
643(defmethod exit-interactive-process ((p tty-listener))
644  (when (eq p *current-process*)
645    (quit)))
646
647(defmethod process-stop-dribbling ((p process))
648  (with-slots (dribble-stream dribble-saved-terminal-io) p
649    (when dribble-stream
650      (close dribble-stream)
651      (setq dribble-stream nil))
652    (when dribble-saved-terminal-io
653      (setq *terminal-io* dribble-saved-terminal-io
654            dribble-saved-terminal-io nil))))
655
656(defmethod process-dribble ((p process) path)
657  (with-slots (dribble-stream dribble-saved-terminal-io) p
658    (process-stop-dribbling p)
659    (when path
660      (let* ((in (two-way-stream-input-stream *terminal-io*))
661             (out (two-way-stream-output-stream *terminal-io*))
662             (f (open path :direction :output :if-exists :append 
663                      :if-does-not-exist :create)))
664        (without-interrupts
665         (setq dribble-stream f
666               dribble-saved-terminal-io *terminal-io*
667               *terminal-io* (make-echoing-two-way-stream
668                              (make-echo-stream in f)
669                              (make-broadcast-stream out f)))))
670      path)))
671
672(defmethod join-process ((p process) &key (default nil defaultp))
673  (wait-on-semaphore (process-termination-semaphore p) nil "join-process")
674  (let ((result (process-result p)))
675    (cond ((car result) (values-list (cdr result)))
676          (defaultp default)
677          (t (error "Failed to join ~s" p)))))
Note: See TracBrowser for help on using the repository browser.