Question

At the moment I'm trying to construct a web service with a RESTful API that handles some long running tasks (jobs).

The idea is that a user submits a job by doing a POST which returns some URL for checking the job status which also contains a url for the results. Once the job is complete (i.e. some value was written to a database) the results URL will return the appropriate information (instead of no results) and the job url will indicate a completed status.

Unfortunately the calculations are quite intensive so only one can be run at a time, therefore the jobs need to be queued.

In pseudo something like this would be needed

(def job-queue (atom queue)) ;; some queue 
(def jobs (atom {}))

(defn schedule-job [params] 
  ;; schedules the job into the queue and 
  ;; adds the job to a jobs map for checking status via GET
  ;; note that the job should not  be evaluated until popped from the queue
)

(POST "/analyze" [{params :params}] 
 (schedulde-job params))

(GET "job/:id" [:d] 
 (get @jobs id))

;; Some function that pops the next item from the queue 
;; and evaluates it when the previous item is complete
;; Note: should not terminate when queue is empty! 

I've looked into Lamina which allows asynchronous processing but it didn't seem to suit my needs.

My question is how to dequeue the jobs-queue and execute its task after the previous one has finished, without terminating when the queue is empty i.e. perpetually processing the incoming jobs.

Was it helpful?

Solution

A java.util.concurrent.ExecutorService may be what you want. This allows you to submit a job for later execution, and returns a Future that you can query to discover if it has completed.

(import '[java.util.concurrent Callable Executors])

(def job-executor
  (Executors/newSingleThreadExecutor))

(def jobs (atom {}))

(defn submit-job [func]
  (let [job-id   (str (java.util.UUID/randomUUID))
        callable (reify Callable (call [_] (func))]
    (swap! jobs assoc job-id (.submit job-executor callable))
    job-id))

(use 'compojure.core)

(defroutes app
  (POST "/jobs" [& params]
    (let [id (submit-job #(analyze params))]
      {:status 201 :headers {"Location" (str "/jobs/" id)}}))
  (GET "/jobs/:id" [id]
    (let [job-future (@jobs id)]
      (if (.isDone job-future)
        (.get job-future)
        {:status 404}))))

OTHER TIPS

This seems to be doing what I expected, but it does seem rather un-idiomatic. Anyone have thoughts on how to improve this?

;; Create a unique identifier
(defn uuid [] (str (java.util.UUID/randomUUID)))

;; Create a job-queue and a map for keeping track of the status
(def job-queue (ref clojure.lang.PersistentQueue/EMPTY))
(def jobs (atom {}))

(defn dequeue! [queue-ref]
  ;; Pops the first element off the queue-ref
  (dosync 
    (let [item (peek @queue-ref)]
      (alter queue-ref pop)
      item)))

(defn schedule-job! [task] 
  ;; Schedule a task to be executed, expects a function (task) to be evaluated
  (let [uuid (uuid)
        job (delay task)]
    (dosync 
      (swap! jobs assoc uuid job) 
      (alter job-queue conj job))))

(defn run-jobs []
  ;; Runs the jobs 
  (while true
    (Thread/sleep 10)
    (let [curr (dequeue! job-queue)] 
      (if-not (nil? curr) (@curr)))))

(.start (Thread. run-jobs))

Your description seems like a multiple producer and single consumer scenario. Below is an example code (which you can hook up with the REST stuff and possibly some exception handling so that agent doesn't get dead)

(def worker (agent {}))                                                                                                                              

(defn do-task [name func]                                                                                                                            
  (send worker                                                                                                                                       
        (fn [results]                                                                                                                                 
          (let [r (func)]                                                                                                                            
            (assoc results name r)))))

;submit tasks                                                                                                               
(do-task "uuid1" #(print 10))                                                                                                                        
(do-task "uuid2" #(+ 1 1))

;get all results
(print @worker) 
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top