source: trunk/ccl/level-1/l1-processes.lisp @ 6942

Last change on this file since 6942 was 6942, checked in by gb, 14 years ago

PROCESS-DEBUG-CONDITION, so that it can be overridden.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 22.2 KB
Line 
1;;;-*- Mode: Lisp; Package: CCL -*-
2;;;
3;;;   Copyright (C) 1994-2001 Digitool, Inc
4;;;   This file is part of OpenMCL. 
5;;;
6;;;   OpenMCL is licensed under the terms of the Lisp Lesser GNU Public
7;;;   License , known as the LLGPL and distributed with OpenMCL as the
8;;;   file "LICENSE".  The LLGPL consists of a preamble and the LGPL,
9;;;   which is distributed with OpenMCL as the file "LGPL".  Where these
10;;;   conflict, the preamble takes precedence. 
11;;;
12;;;   OpenMCL is referenced in the preamble as the "LIBRARY."
13;;;
14;;;   The LLGPL is also available online at
15;;;   http://opensource.franz.com/preamble.html
16
17
18;; L1-processes.lisp
19
20(cl:in-package "CCL")
21
22
23(let* ((all-processes ())
24       (shutdown-processes ())
25       (all-processes-lock (make-lock)))
26  (defun add-to-all-processes (p)
27    (with-lock-grabbed (all-processes-lock)
28      (pushnew p all-processes :test #'eq)
29      p))
30  (defun remove-from-all-processes (p)
31    (with-lock-grabbed (all-processes-lock)
32      (setq all-processes (delete p all-processes))
33      t))
34  (defun all-processes ()
35    "Obtain a fresh list of all known Lisp threads."
36    (with-lock-grabbed (all-processes-lock)
37      (copy-list all-processes)))
38  (defun shutdown-processes ()
39    (with-lock-grabbed (all-processes-lock)
40      (copy-list shutdown-processes)))
41  (defun %clear-shutdown-proceses ()
42    (setq shutdown-processes nil))
43  (defun add-to-shutdown-processes (p)
44    (with-lock-grabbed (all-processes-lock)
45      (pushnew p shutdown-processes :test #'eq))
46    t)
47  (defun pop-shutdown-processes ()
48    (with-lock-grabbed (all-processes-lock)
49      (pop shutdown-processes)))
50  (defun find-process (id)
51    (etypecase id
52      (process id)
53      (integer (with-lock-grabbed (all-processes-lock)
54                 (find id all-processes
55                       :key #'(lambda (p)
56                                (process-serial-number p)))))
57      (string (with-lock-grabbed (all-processes-lock)
58                (find id all-processes
59                      :key #'(lambda (p)
60                               (process-name p))
61                      :test #'equal))))))
62
63
64
65(defun not-in-current-process (p operation)
66  (if (eq p *current-process*)
67    (error "The current process (~s) can't perform the ~a operation on itself."
68           p operation)))
69
70(defun startup-shutdown-processes ()
71  (let* ((p))
72    (loop
73      (unless (setq p (pop-shutdown-processes)) (return))
74      (new-tcr-for-thread (process-thread p))
75      (%process-preset-internal p)
76      (process-enable p)
77      )))
78
79;;; Done with a queue-fixup so that it will be the last thing
80;;; that happens on startup.
81(queue-fixup
82 (pushnew 'startup-shutdown-processes *lisp-system-pointer-functions*))
83
84
85
86
87
88
89
90(defun wrap-initial-bindings (alist)
91  (mapcar #'(lambda (pair)
92              (destructuring-bind (symbol . valform) pair
93                (cons (require-type symbol 'symbol)
94                      (cond ((symbolp valform)
95                             (constantly (symbol-value valform)))
96                            ((typep valform 'function) valform)
97                            ((consp valform)
98                             (if (eq (car valform) 'quote)
99                               (constantly (cadr valform))
100                               #'(lambda () (apply (car valform) (cdr valform)))))
101                            (t
102                             (constantly valform))))))
103          alist))
104
105
106(defun valid-allocation-quantum-p (x)
107  (and (>= x *host-page-size*)
108       (<= x (default-allocation-quantum))
109       (= (logcount x) 1)))
110
111 
112(let* ((psn -1))
113  (defun %new-psn () (incf psn)))
114
115(defclass process ()
116    ((name :initform nil :initarg :name :accessor process-name)
117     (thread :initarg :thread :accessor process-thread)
118     (initial-form :initform (cons nil nil) :reader process-initial-form)
119     (priority :initform 0 :initarg :priority :accessor process-priority)
120     (persistent :initform nil :initarg :persistent :reader process-persistent)
121     (whostate :initform "Reset" :accessor %process-whostate)
122     (splice :initform (cons nil nil) :accessor process-splice)
123     (initial-bindings :initform nil :initarg :initial-bindings
124                       :accessor process-initial-bindings)
125     (serial-number :initform (%new-psn) :accessor process-serial-number)
126     (creation-time :initform (get-tick-count) :reader process-creation-time)
127     (total-run-time :initform nil :accessor %process-total-run-time)
128     (ui-object :initform (application-ui-object *application*)
129                :accessor process-ui-object)
130     (termination-semaphore :initform nil
131                            :initarg :termination-semaphore
132                            :accessor process-termination-semaphore
133                            :type (or null semaphore))
134     (allocation-quantum :initform (default-allocation-quantum)
135                         :initarg :allocation-quantum
136                         :reader process-allocation-quantum
137                         :type (satisfies valid-allocation-quantum-p))
138     (dribble-stream :initform nil)
139     (dribble-saved-terminal-io :initform nil))
140  (:primary-p t))
141
142(defmethod print-object ((p process) s)
143  (print-unreadable-object (p s :type t :identity t)
144    (format s "~a(~d) [~a]" (process-name p)
145            (process-serial-number p)(process-whostate p))))
146
147(defvar *process-class* (find-class 'process))
148
149(defun processp (p)
150  (memq *process-class* (class-precedence-list (class-of p))))
151
152(set-type-predicate 'process 'processp)
153
154(defun make-process (name &key 
155                          thread
156                          persistent
157                          (priority 0)
158                          (stack-size *default-control-stack-size*)
159                          (vstack-size *default-value-stack-size*)
160                          (tstack-size *default-temp-stack-size*)
161                          (initial-bindings ())
162                          (use-standard-initial-bindings t)
163                          (class (find-class 'process))
164                          (termination-semaphore ())
165                          (allocation-quantum (default-allocation-quantum)))
166  "Create and return a new process."
167  (declare (ignore flavor))
168  (let* ((p (make-instance
169             class
170             :name name
171             :thread (or thread
172                         (new-thread name stack-size  vstack-size  tstack-size))
173             :priority priority
174             :persistent persistent
175             :initial-bindings (append (if use-standard-initial-bindings
176                                         (standard-initial-bindings))
177                                       (wrap-initial-bindings
178                                        initial-bindings))
179             :termination-semaphore (or termination-semaphore
180                                        (make-semaphore))
181             :allocation-quantum allocation-quantum)))
182    (add-to-all-processes p)
183    (setf (car (process-splice p)) p)
184    p))
185
186
187(defstatic *initial-process*
188    (let* ((p (make-process
189               "Initial"
190               :thread *initial-lisp-thread*
191               :priority 0)))
192      (setf (%process-whostate p) "Active")
193      p))
194
195
196(defvar *current-process* *initial-process*
197  "Bound in each process, to that process itself.")
198
199(defstatic *interactive-abort-process* *initial-process*)
200
201
202
203
204(defun process-tcr (p)
205  (lisp-thread.tcr (process-thread p)))
206
207
208
209(defun process-exhausted-p (p)
210  (let* ((thread (process-thread p)))
211    (or (null thread)
212        (thread-exhausted-p thread))))
213 
214
215(defun process-whostate (p)
216  "Return a string which describes the status of a specified process."
217  (if (process-exhausted-p p)
218    "Exhausted"
219    (%process-whostate p)))
220
221
222
223
224
225(defun process-total-run-time (p)
226  (or (%process-total-run-time p)
227      (thread-total-run-time (process-thread p))))
228
229
230
231
232(defun initial-bindings (alist)
233  (let* ((symbols ())
234         (values ()))
235    (dolist (a alist (values (nreverse symbols) (nreverse values)))
236      (push (car a) symbols)
237      (push (funcall (cdr a)) values))))
238
239
240                           
241(defun symbol-value-in-process (sym process)
242  (symbol-value-in-tcr sym (process-tcr process)))
243
244(defun (setf symbol-value-in-process) (value sym process)
245  (setf (symbol-value-in-tcr sym (process-tcr process)) value))
246
247
248(defun process-enable (p &optional (wait 1))
249  "Begin executing the initial function of a specified process."
250  (setq p (require-type p 'process))
251  (not-in-current-process p 'process-enable)
252  (unless (car (process-initial-form p))
253    (error "Process ~s has not been preset.  Use PROCESS-PRESET to preset the process." p))
254  (let* ((thread (process-thread p)))
255    (do* ((total-wait wait (+ total-wait wait)))
256         ((thread-enable thread (process-termination-semaphore p) (1- (integer-length (process-allocation-quantum p)))  wait)
257          (setf (%process-whostate p) "Active")
258          p)
259      (cerror "Keep trying."
260              "Unable to enable process ~s; have been trying for ~s seconds."
261              p total-wait))))
262
263
264(defmethod (setf process-termination-semaphore) :after (new (p process))
265  (with-macptrs (tcrp)
266    (%setf-macptr-to-object tcrp (process-tcr p))
267    (unless (%null-ptr-p tcrp)
268      (setf (%get-ptr tcrp target::tcr.termination-semaphore)
269            (if new
270              (semaphore-value new)
271              (%null-ptr))))
272    new))
273
274(defun process-resume (p)
275  "Resume a specified process which had previously been suspended
276by process-suspend."
277  (setq p (require-type p 'process))
278  (%resume-tcr (process-tcr p)))
279
280(defun process-suspend (p)
281  "Suspend a specified process."
282  (setq p (require-type p 'process))
283  (if (eq p *current-process*)
284    (error "Suspending the current process can't work.  ~&(If the documentation claims otherwise, it's incorrect.)")
285    (%suspend-tcr (process-tcr p))))
286
287(defun process-suspend-count (p)
288  "Return the number of currently-pending suspensions applicable to
289a given process."
290  (setq p (require-type p 'process))
291  (let* ((thread (process-thread p)))
292    (if thread
293      (lisp-thread-suspend-count thread))))
294
295(defun process-active-p (p)
296  (setq p (require-type p 'process))
297  (and (eql 0 (process-suspend-count p))
298       (not (process-exhausted-p p))))
299 
300;;; Used by process-run-function
301(defun process-preset (process function &rest args)
302  "Set the initial function and arguments of a specified process."
303  (let* ((p (require-type process 'process))
304         (f (require-type function 'function))
305         (initial-form (process-initial-form p)))
306    (declare (type cons initial-form))
307    (not-in-current-process p 'process-preset)
308    ; Not quite right ...
309    (rplaca initial-form f)
310    (rplacd initial-form args)
311    (%process-preset-internal process)))
312
313(defun %process-preset-internal (process)
314   (let* ((initial-form (process-initial-form process))
315         (thread (process-thread process)))
316     (declare (type cons initial-form))
317     (thread-preset
318      thread
319      #'(lambda (process initial-form)
320          (let* ((*current-process* process))
321            (add-to-all-processes process)
322            (multiple-value-bind (syms values)
323                (initial-bindings (process-initial-bindings process))
324              (progv syms values
325                (run-process-initial-form process initial-form)))))
326      process
327      initial-form)
328     process))
329
330
331(defun run-process-initial-form (process initial-form)
332  (let* ((exited nil)
333         (kill (handler-case
334                   (restart-case
335                    (progn
336                      (apply (car initial-form) (cdr (the list initial-form)))
337                      (setq exited t)
338                      nil)
339                    (abort-break () :report "Reset this process")
340                    (abort () :report "Kill this process" (setq exited t)))
341                 (process-reset (condition)
342                   (process-reset-kill condition)))))
343    ;; We either exited from the initial form normally, were told to
344    ;; exit prematurely, or are being reset and should enter the
345    ;; "awaiting preset" state.
346    (if (or kill exited) 
347      (unless (eq kill :toplevel)
348        (process-initial-form-exited process (or kill t)))
349      (progn
350        (thread-change-state (process-thread process) :run :reset)
351        (tcr-set-preset-state (process-tcr process))
352        (setf (%process-whostate process) "Reset")))
353    nil))
354
355;;; Separated from run-process-initial-form just so I can change it easily.
356(defun process-initial-form-exited (process kill)
357  ;; Enter the *initial-process* and have it finish us up
358  (without-interrupts
359   (if (eq kill :shutdown)
360     (progn
361       (setf (%process-whostate process) "Shutdown")
362       (add-to-shutdown-processes process)))
363   (maybe-finish-process-kill process kill)))
364
365(defun maybe-finish-process-kill (process kill)
366  (when (and kill (neq kill :shutdown))
367    (setf (%process-whostate process) "Dead")
368    (remove-from-all-processes process)
369    (let ((thread (process-thread process)))
370      (unless (or (eq thread *current-lisp-thread*)
371                  (thread-exhausted-p thread))
372        (kill-lisp-thread thread))))
373  nil)
374
375
376 
377
378(defun require-global-symbol (s &optional env)
379  (let* ((s (require-type s 'symbol))
380         (bits (%symbol-bits s)))
381    (unless (or (logbitp $sym_vbit_global bits)
382                (let* ((defenv (if env (definition-environment env))))
383                  (if defenv
384                    (eq :global (%cdr (assq s (defenv.specials defenv)))))))
385      (error "~s not defined with ~s" s 'defstatic))
386    s))
387
388
389(defmethod print-object ((s lock) stream)
390  (print-unreadable-object (s stream :type t :identity t)
391    (let* ((val (uvref s target::lock._value-cell))
392           (name (uvref s target::lock.name-cell)))
393      (when name
394        (format stream "~s " name))
395      (if (typep val 'macptr)
396        (format stream "[ptr @ #x~x]"
397                (%ptr-to-int val))))))
398
399(defun lockp (l)
400  (eq target::subtag-lock (typecode l)))
401
402(set-type-predicate 'lock 'lockp)
403
404(defun recursive-lock-p (l)
405  (and (eq target::subtag-lock (typecode l))
406       (eq 'recursive-lock (%svref l target::lock.kind-cell))))
407
408(defun read-write-lock-p (l)
409  (and (eq target::subtag-lock (typecode l))
410       (eq 'read-write-lock (%svref l target::lock.kind-cell))))
411
412(setf (type-predicate 'recursive-lock) 'recursive-lock-p
413      (type-predicate 'read-write-lock) 'read-write-lock-p)
414
415
416(defun grab-lock (lock &optional flag)
417  "Wait until a given lock can be obtained, then obtain it."
418  (%lock-recursive-lock (recursive-lock-ptr lock) flag))
419
420(defun release-lock (lock)
421  "Relinquish ownership of a given lock."
422  (%unlock-recursive-lock (recursive-lock-ptr lock)))
423
424(defun try-lock (lock &optional flag)
425  "Obtain the given lock, but only if it is not necessary to wait for it."
426  (%try-recursive-lock (recursive-lock-ptr lock) flag))
427
428(defun lock-acquisition-status (thing)
429  (if (istruct-typep thing 'lock-acquisition)
430    (lock-acquisition.status thing)
431    (report-bad-arg thing 'lock-acquisition)))
432
433(defun clear-lock-acquisition-status (thing)
434  (if (istruct-typep thing 'lock-acquisition)
435    (setf (lock-acquisition.status thing) nil)
436    (report-bad-arg thing 'lock-acquisition)))
437
438(defmethod print-object ((l lock-acquisition) stream)
439  (print-unreadable-object (l stream :type t :identity t)
440    (format stream "[status = ~s]" (lock-acquisition-status l))))
441
442(defun semaphore-notification-status (thing)
443  (if (istruct-typep thing 'semaphore-notification)
444    (semaphore-notification.status thing)
445    (report-bad-arg thing 'semaphore-notification)))
446
447(defun clear-semaphore-notification-status (thing)
448  (if (istruct-typep thing 'semaphore-notification)
449    (setf (semaphore-notification.status thing) nil)
450    (report-bad-arg thing 'semaphore-notification)))
451
452(defmethod print-object ((l semaphore-notification) stream)
453  (print-unreadable-object (l stream :type t :identity t)
454    (format stream "[status = ~s]" (semaphore-notification-status l))))
455
456(defun process-wait (whostate function &rest args)
457  "Causes the current lisp process (thread) to wait for a given
458predicate to return true."
459  (declare (dynamic-extent args))
460  (or (apply function args)
461      (with-process-whostate (whostate)
462        (loop
463          (when (apply function args)
464            (return))
465          ;; Sleep for a tick
466          (%nanosleep 0 *ns-per-tick*)))))
467
468
469
470(defun process-wait-with-timeout (whostate time function &rest args)
471  "Cause the current thread to wait for a given predicate to return true,
472or for a timeout to expire."
473  (declare (dynamic-extent args))
474  (cond ((null time)  (apply #'process-wait whostate function args) t)
475        (t (let* ((win nil)
476                  (when (+ (get-tick-count) time))
477                  (f #'(lambda () (let ((val (apply function args)))
478                                    (if val
479                                      (setq win val)
480                                      (> (get-tick-count) when))))))
481             (declare (dynamic-extent f))
482             (process-wait whostate f)
483             win))))
484
485
486(defmethod process-interrupt ((process process) function &rest args)
487  "Arrange for the target process to invoke a specified function at
488some point in the near future, and then return to what it was doing."
489  (let* ((p (require-type process 'process)))
490    (if (eq p *current-process*)
491      (progn
492        (apply function args)
493        t)
494      (thread-interrupt
495       (process-thread p)
496       process
497       #'apply
498       function args))))
499
500(defmethod process-debug-condition ((p process) condition frame-pointer)
501  (declare (ignore condition frame-pointer)))
502
503
504
505
506;;; This one is in the Symbolics documentation
507(defun process-allow-schedule ()
508  "Used for cooperative multitasking; probably never necessary."
509  (yield))
510
511
512;;; something unique that users won't get their hands on
513(defun process-reset-tag (process)
514  (process-splice process))
515
516(defun process-run-function (name-or-keywords function &rest args)
517  "Create a process, preset it, and enable it."
518  (if (listp name-or-keywords)
519    (%process-run-function name-or-keywords function args)
520    (let ((keywords (list :name name-or-keywords)))
521      (declare (dynamic-extent keywords))
522      (%process-run-function keywords function args))))
523
524(defun %process-run-function (keywords function args)
525  (destructuring-bind (&key (name "Anonymous")
526                            (priority  0)
527                            (stack-size *default-control-stack-size*)
528                            (vstack-size *default-value-stack-size*)
529                            (tstack-size *default-temp-stack-size*)
530                            (initial-bindings ())
531                            (persistent nil)
532                            (use-standard-initial-bindings t)
533                            (termination-semaphore nil)
534                            (allocation-quantum (default-allocation-quantum)))
535                      keywords
536    (setq priority (require-type priority 'fixnum))
537    (let* ((process (make-process name
538                                  :priority priority
539                                  :stack-size stack-size
540                                  :vstack-size vstack-size
541                                  :tstack-size tstack-size
542                                  :persistent persistent
543                                  :use-standard-initial-bindings use-standard-initial-bindings
544                                  :initial-bindings initial-bindings
545                                  :termination-semaphore termination-semaphore
546                                  :allocation-quantum allocation-quantum)))
547      (process-preset process #'(lambda () (apply function args)))
548      (process-enable process)
549      process)))
550
551(defmethod process-reset ((process process) &optional kill)
552  "Cause a specified process to cleanly exit from any ongoing computation."
553  (setq process (require-type process 'process))
554  (unless (memq kill '(nil :kill :shutdown))
555    (setq kill (require-type kill '(member nil :kill :shutdown))))
556  (if (eq process *current-process*)
557    (%process-reset kill)
558    (if (process-exhausted-p process)
559      (maybe-finish-process-kill process kill)
560      (progn
561        (process-interrupt process '%process-reset kill)))))
562
563
564(defun %process-reset (kill)
565  (signal 'process-reset :kill kill)
566  (maybe-finish-process-kill *current-process* kill))
567
568;;; By default, it's just fine with the current process
569;;; if the application/user wants to quit.
570(defmethod process-verify-quit ((process process))
571  t)
572
573(defmethod process-exit-application ((process process) thunk)
574  (when (eq process *initial-process*)
575    (prepare-to-quit)
576    (%set-toplevel thunk)
577    (fresh-line *stdout*)
578    (finish-output *stdout*)
579    (toplevel)))
580
581
582
583(defmethod process-kill ((process process))
584  "Cause a specified process to cleanly exit from any ongoing
585computation, and then exit."
586  (and (process-interrupt process #'%process-reset :kill)
587       (setf (process-kill-issued process) t)))
588
589(defun process-abort (process &optional condition)
590  "Cause a specified process to process an abort condition, as if it
591had invoked abort."
592  (process-interrupt process
593                     #'(lambda ()
594                         (abort condition))))
595
596(defmethod process-reset-and-enable ((process process))
597  (not-in-current-process process 'process-reset-and-enable)
598  (process-reset process)
599  (process-enable process))
600
601(defmethod process-kill-issued ((process process))
602  (cdr (process-splice process)))
603
604(defmethod (setf process-kill-issued) (val (process process))
605  (setf (cdr (process-splice process)) val))
606
607(defun tcr->process (tcr)
608  (dolist (p (all-processes))
609    (when (eq tcr (process-tcr p))
610      (return p))))
611
612(defun current-process-allocation-quantum ()
613  (process-allocation-quantum *current-process*))
614
615(defun (setf current-process-allocation-quantum) (new)
616  (if (valid-allocation-quantum-p new)
617    (with-macptrs (tcrp)
618      (%setf-macptr-to-object tcrp (%current-tcr))
619      (setf (slot-value *current-process* 'allocation-quantum) new
620            (%get-natural tcrp target::tcr.log2-allocation-quantum)
621            (1- (integer-length new)))
622      new)
623    (report-bad-arg new '(satisfies valid-allocation-quantum-p))))
624
625
626(def-standard-initial-binding *backtrace-contexts* nil)
627
628(defmethod exit-interactive-process ((p process))
629  (unless (eq p *initial-process*)
630    (when (eq p *current-process*)
631      (process-kill p))))
632
633(defclass tty-listener (process)
634    ())
635
636(defmethod exit-interactive-process ((p tty-listener))
637  (when (eq p *current-process*)
638    (quit)))
639
640(defmethod process-stop-dribbling ((p process))
641  (with-slots (dribble-stream dribble-saved-terminal-io) p
642    (when dribble-stream
643      (close dribble-stream)
644      (setq dribble-stream nil))
645    (when dribble-saved-terminal-io
646      (setq *terminal-io* dribble-saved-terminal-io
647            dribble-saved-terminal-io nil))))
648
649(defmethod process-dribble ((p process) path)
650  (with-slots (dribble-stream dribble-saved-terminal-io) p
651    (process-stop-dribbling p)
652    (when path
653      (let* ((in (two-way-stream-input-stream *terminal-io*))
654             (out (two-way-stream-output-stream *terminal-io*))
655             (f (open path :direction :output :if-exists :append 
656                      :if-does-not-exist :create)))
657        (without-interrupts
658         (setq dribble-stream f
659               dribble-saved-terminal-io *terminal-io*
660               *terminal-io* (make-echoing-two-way-stream
661                              (make-echo-stream in f)
662                              (make-broadcast-stream out f)))))
663      path)))
Note: See TracBrowser for help on using the repository browser.