Pergunta

The description of the problem I am currently trying to find a lispy/working solution for:

A job-queue provides a set of equal (by their code) threads with tasks they shall take care of. If the queue is empty the threads shall wait until a new entry has been made, but I also want to provide a clean shutdown. Therefore even while waiting for the queue, it has to be possible for the mother-thread to set some variable/to call the threads and tell them to shut down. The only reason they would not comply directly should be that the thread is currently evaluating a tasks and therefore busy/not able to make a clean shutdown until the task is done.

I currently got two solutions I am not really convinced by:

(defparameter *kill-yourself* nil)

(defparameter *mutex* (sb-thread:make-mutex))

(defparameter *notify* (sb-thread:make-waitqueue))

#|the queue is thread safe|#
(defparameter *job-queue* (make-instance 'queue))


(defun fill-queue (with-data)
   (fill-stuff-in-queue)
   (sb-thread:with-mutex (*mutex*)
     (sb-thread:condition-notify *notify*)))


#|solution A|#
(with-mutex (*mutex*)
  (do ((curr-job nil))
      (*kill-yourself* nil)
    (if (is-empty *job-queue*)
    (sb-thread:condition-wait *notify* *mutex*)
    (progn
      (setf curr-job (dequeue *job-queue*))
      (do-stuff-with-job)))))


#|solution B|#
(defun helper-kill-yourself-p ()
  (sb-thread:with-mutex (*mutex*)
     *kill-yourself*))

(do ((job (dequeue-* *job-queue* :timeout 0) 
      (dequeue-* *job-queue* :timeout 0)))
        ((if (helper-kill-yourself-p)
         t
                 (sb-thread:with-mutex (*mutex*)
                     (sb-thread:condition-wait *notify* *mutex*)
                     (if (helper-kill-yourself-p)
                          t
                          nil)))
         (progn
           nil))
     (do-stuff-with-job))

Both do-loops could be used to start threads. But A would not really work if there is more than one thread (as the mutex would prevent any parallel action taking place), and the B solution looks/is quite dirty as there is the possibility of sidecases where the extracted job is nil. Furthermore I am not really convinced by the halt-condition, as it is too long and seems to complicated.

What would be the proper way to implement a (do)loop which works on data provided by a queue as long as it is supposed to and is also able to sleep as long as there is no new data and as long as it is not supposed to shut down? Last but not least it has to be possible to use this (do)loop in an unlimited amount of multiple parallel threads.

Foi útil?

Solução

Solution A

Yes, you are right about Solution A, mutex won't let jobs to be executed in parallel.

Solution B

I think that do loop is not right tool for the job. Particularly, in your code there is a possibility that job will be extracted from the queue and thread will quit without executing it. This situation is possible because you dequeue before should-terminate-check. Also because you define job in do's variables block you ignore multiple values returned from dequeue, which is also bad since you can not effectively check if the queue is empty. Also in the scenario where you check if thread should stop in do end-test-form you'll have to acquire *mutex* twice, to check if thread should stop and to dequeue (or you can invent strange end-test-form which will do the job of loop body).

So, having said that, you'll have to put all the code inside do's body and leave vars and end-test empty. That is why I think loop is better in this case.

If you have to use do loop you can easily wrap loop body into it, e.g. (do nil (nil nil) *loop-body*).

My solution

(require :sb-concurrency)
(use-package :sb-concurrency)
(use-package :sb-thread)

(defparameter *kill-yourself* nil)
(defparameter *mutex* (make-mutex))
(defparameter *notify* (make-waitqueue))
#|the queue is thread safe|#
(defparameter *job-queue* (make-queue :name "job-queue"))
(defparameter *timeout* 10)
(defparameter *output-lock* (make-mutex))

(defun output (line)
  (with-mutex (*output-lock*)
    (write-line line)))

(defun fill-queue (with-data)
  (enqueue with-data *job-queue*)
  (with-mutex (*mutex*)
    (condition-notify *notify*)))

(defun process-job (thread-name job)
  (funcall job thread-name))

(defun run-worker (name)
  (make-thread
    (lambda ()
      (output (format nil "starting thread ~a" name))
      (loop (with-mutex (*mutex*)
              (condition-wait *notify* *mutex* :timeout *timeout*)
              (when *kill-yourself*
                (output (format nil "~a thread quitting" name))
                (return-from-thread nil)))
            ;; release *mutex* before starting the job,
            ;; otherwise it won't allow other threads wait for new jobs

            ;; you don't want to make 2 separate calls (queue-empty-p, dequeue)
            ;; since inbetween queue can become empty
            (multiple-value-bind (job has-job) (dequeue *job-queue*)
              (if has-job
                (process-job name job)))))
    :name name))

(defun stop-work ()
  (with-mutex (*mutex*)
    (setf *kill-yourself* t)
    (condition-broadcast *notify*)))

(defun add-job (job)
  ;; no need to put enqueue in critical section
  (enqueue job *job-queue*)
  (with-mutex (*mutex*)
    (condition-notify *notify*)))

(defun make-job (n)
  (lambda (thread-name)
    (loop for i upto 1000 collecting i)
    (output (format nil "~a thread executes ~a job" thread-name n))))

(defun try-me ()
  (run-worker "worker1")
  (run-worker "worker2")
  (loop for i upto 1000 do
        (add-job (make-job i)))
  (loop for i upto 2000 collecting i)
  (stop-work))

calling try-me in REPL should give you something like the following output

starting thread worker1
worker1 thread executes 0 job
worker1 thread executes 1 job
worker1 thread executes 2 job
worker1 thread executes 3 job
starting thread worker2
worker2 thread executes 4 job
worker1 thread executes 5 job
worker2 thread executes 6 job
worker1 thread executes 7 job
worker1 thread executes 8 job
...
worker2 thread executes 33 job
worker1 thread executes 34 job
worker2 thread executes 35 job
worker1 thread executes 36 job
worker1 thread executes 37 job
worker2 thread executes 38 job
0
worker1 thread executes 39 job
worker2 thread quitting
worker1 thread quitting

P.S. I was not able to find documentation for old SBCL, so I leave translation to the older API up to you. Hope it will help.

Edit Class Solution

In comments to your (deleted) answer we found out that you want a class for the event loop. I come up with the following

(defclass event-loop ()
  ((lock
     :initform (make-mutex))
   (queue
     :initform (make-waitqueue))
   (jobs
     :initform (make-queue))
   (stopped
     :initform nil)
   (timeout
     :initarg :wait-timeout
     :initform 0)
   (process-job
     :initarg :process-job
     :initform #'identity)
   (worker-count
     :initarg :worker-count
     :initform (error "Must supply worker count"))))

(defmethod initialize-instance :after ((eloop event-loop) &key)
  (with-slots (worker-count timeout lock queue jobs process-job stopped) eloop
    (dotimes (i worker-count)
      (make-thread
        (lambda ()
          (loop (with-mutex (lock)
                  (condition-wait queue lock :timeout timeout)
                  (when stopped
                    (return-from-thread nil)))
                ;; release *mutex* before starting the job,
                ;; otherwise it won't allow other threads wait for new jobs

                ;; you don't want to make 2 separate calls (queue-empty-p, dequeue)
                ;; since inbetween queue can become empty
                (multiple-value-bind (job has-job) (dequeue jobs)
                  (if has-job
                    (funcall process-job job)))))))))

(defun push-job (job event-loop )
  (with-slots (lock queue jobs) event-loop
    (enqueue job jobs)
    (with-mutex (lock)
      (condition-notify queue))))

(defun stop-loop (event-loop)
  (with-slots (lock queue stopped) event-loop
    (with-mutex (lock)
      (setf stopped t)
      (condition-broadcast queue))))

You can use it like this

> (defparameter *el* (make-instance 'event-loop :worker-count 10 :process-job #'funcall))
> (defparameter *oq* (make-queue))
> (dotimes (i 100)
    (push-job (let ((n i)) (lambda ()
                             (sleep 1)
                             (enqueue (format nil "~a job done" n) *oq*))) *el*))

It uses sb-thread:queue as output to avoid strange results. While this is working you can examine *oq* in your REPL.

> *oq*
#S(QUEUE
:HEAD (SB-CONCURRENCY::.DUMMY. "7 job done" "1 job done" "9 job done"
       "6 job done" "2 job done" "11 job done" "10 job done" "16 job done"
       "12 job done" "4 job done" "3 job done" "17 job done" "5 job done"
       "0 job done" "8 job done" "14 job done" "25 job done" "15 job done"
       "21 job done" "28 job done" "13 job done" "23 job done" "22 job done"
       "19 job done" "27 job done" "18 job done")
:TAIL ("18 job done")
:NAME NIL)

Outras dicas

I have used the library chanl, which provides a message queue mechanism. When I wanted threads to shut down, I simply sent the keyword :stop to the queue. Of course, this does not stop before all things that come before :stop in the queue are finished. If you want to stop earlier, you could make another queue (a control queue) that is checked before the data queue.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top