Solution A
Yes, you are right about Solution A, mutex won't let jobs to be executed in parallel.
Solution B
I think that do
loop is not right tool for the job. Particularly, in your
code there is a possibility that job will be extracted from the queue and thread
will quit without executing it. This situation is possible because you dequeue
before should-terminate-check. Also because you define job
in do
's variables
block you ignore multiple values returned from dequeue
, which is also bad
since you can not effectively check if the queue is empty. Also in the scenario
where you check if thread should stop in do
end-test-form you'll have to
acquire *mutex*
twice, to check if thread should stop and to dequeue (or
you can invent strange end-test-form which will do the job of loop body).
So, having said that, you'll have to put all the code inside do
's body and
leave vars and end-test empty. That is why I think loop
is better in this
case.
If you have to use do
loop you can easily wrap loop
body into it, e.g.
(do nil (nil nil) *loop-body*)
.
My solution
(require :sb-concurrency)
(use-package :sb-concurrency)
(use-package :sb-thread)
(defparameter *kill-yourself* nil)
(defparameter *mutex* (make-mutex))
(defparameter *notify* (make-waitqueue))
#|the queue is thread safe|#
(defparameter *job-queue* (make-queue :name "job-queue"))
(defparameter *timeout* 10)
(defparameter *output-lock* (make-mutex))
(defun output (line)
(with-mutex (*output-lock*)
(write-line line)))
(defun fill-queue (with-data)
(enqueue with-data *job-queue*)
(with-mutex (*mutex*)
(condition-notify *notify*)))
(defun process-job (thread-name job)
(funcall job thread-name))
(defun run-worker (name)
(make-thread
(lambda ()
(output (format nil "starting thread ~a" name))
(loop (with-mutex (*mutex*)
(condition-wait *notify* *mutex* :timeout *timeout*)
(when *kill-yourself*
(output (format nil "~a thread quitting" name))
(return-from-thread nil)))
;; release *mutex* before starting the job,
;; otherwise it won't allow other threads wait for new jobs
;; you don't want to make 2 separate calls (queue-empty-p, dequeue)
;; since inbetween queue can become empty
(multiple-value-bind (job has-job) (dequeue *job-queue*)
(if has-job
(process-job name job)))))
:name name))
(defun stop-work ()
(with-mutex (*mutex*)
(setf *kill-yourself* t)
(condition-broadcast *notify*)))
(defun add-job (job)
;; no need to put enqueue in critical section
(enqueue job *job-queue*)
(with-mutex (*mutex*)
(condition-notify *notify*)))
(defun make-job (n)
(lambda (thread-name)
(loop for i upto 1000 collecting i)
(output (format nil "~a thread executes ~a job" thread-name n))))
(defun try-me ()
(run-worker "worker1")
(run-worker "worker2")
(loop for i upto 1000 do
(add-job (make-job i)))
(loop for i upto 2000 collecting i)
(stop-work))
calling try-me
in REPL should give you something like the following output
starting thread worker1
worker1 thread executes 0 job
worker1 thread executes 1 job
worker1 thread executes 2 job
worker1 thread executes 3 job
starting thread worker2
worker2 thread executes 4 job
worker1 thread executes 5 job
worker2 thread executes 6 job
worker1 thread executes 7 job
worker1 thread executes 8 job
...
worker2 thread executes 33 job
worker1 thread executes 34 job
worker2 thread executes 35 job
worker1 thread executes 36 job
worker1 thread executes 37 job
worker2 thread executes 38 job
0
worker1 thread executes 39 job
worker2 thread quitting
worker1 thread quitting
P.S. I was not able to find documentation for old SBCL, so I leave translation to the older API up to you. Hope it will help.
Edit Class Solution
In comments to your (deleted) answer we found out that you want a class for the event loop. I come up with the following
(defclass event-loop ()
((lock
:initform (make-mutex))
(queue
:initform (make-waitqueue))
(jobs
:initform (make-queue))
(stopped
:initform nil)
(timeout
:initarg :wait-timeout
:initform 0)
(process-job
:initarg :process-job
:initform #'identity)
(worker-count
:initarg :worker-count
:initform (error "Must supply worker count"))))
(defmethod initialize-instance :after ((eloop event-loop) &key)
(with-slots (worker-count timeout lock queue jobs process-job stopped) eloop
(dotimes (i worker-count)
(make-thread
(lambda ()
(loop (with-mutex (lock)
(condition-wait queue lock :timeout timeout)
(when stopped
(return-from-thread nil)))
;; release *mutex* before starting the job,
;; otherwise it won't allow other threads wait for new jobs
;; you don't want to make 2 separate calls (queue-empty-p, dequeue)
;; since inbetween queue can become empty
(multiple-value-bind (job has-job) (dequeue jobs)
(if has-job
(funcall process-job job)))))))))
(defun push-job (job event-loop )
(with-slots (lock queue jobs) event-loop
(enqueue job jobs)
(with-mutex (lock)
(condition-notify queue))))
(defun stop-loop (event-loop)
(with-slots (lock queue stopped) event-loop
(with-mutex (lock)
(setf stopped t)
(condition-broadcast queue))))
You can use it like this
> (defparameter *el* (make-instance 'event-loop :worker-count 10 :process-job #'funcall))
> (defparameter *oq* (make-queue))
> (dotimes (i 100)
(push-job (let ((n i)) (lambda ()
(sleep 1)
(enqueue (format nil "~a job done" n) *oq*))) *el*))
It uses sb-thread:queue
as output to avoid strange results. While this is working you can examine *oq*
in your REPL.
> *oq*
#S(QUEUE
:HEAD (SB-CONCURRENCY::.DUMMY. "7 job done" "1 job done" "9 job done"
"6 job done" "2 job done" "11 job done" "10 job done" "16 job done"
"12 job done" "4 job done" "3 job done" "17 job done" "5 job done"
"0 job done" "8 job done" "14 job done" "25 job done" "15 job done"
"21 job done" "28 job done" "13 job done" "23 job done" "22 job done"
"19 job done" "27 job done" "18 job done")
:TAIL ("18 job done")
:NAME NIL)