Clojure -way of building live-stream server — with promises, but is it the right way

StackOverflow https://stackoverflow.com//questions/9575483

  •  07-12-2019
  •  | 
  •  

Вопрос

I'm looking for the clojure-way of building a live streaming server. The particular problem I'm struggling with is how to send the values from a single provider (the webcam) to an undefined number of threads (the connected clients). Obviously when a client connects it's not interested in the full video file from the webcam, basically it needs to be sent a header and then whatever packages are arriving from the webcam at that exact moment.

In straight java, I think it would be easy. Whenever a client connects, add the connection to an array, when it disconnects remove the connection from the array, and whenever a new package from the webcam arrives send it to each entry in the array. Lock the array so that either we're adding/removing entries, or looping through it to send packets to. Of course we could build the same in clojure, but this sounds really evil.

In a message passing multi-threaded architecture this sounds equally easy.

The only solution I could think of in clojure is with a lazy sequence of promises. Indeed it works, but I was wondering whether there is another way that leads to cleaner code and more clojure-zen :)

Just to illustrate: a simplified problem, with promises and atoms:

One provider function generating data, one thread that reads this data. Later some other threads are created that would like to get the data from this first thread, but can't get to it.

(defn provider []                                                                                                                                                                                                                                                                                                             
  (lazy-seq                                                                                                                                                                                                                                                                                                                   
    (do                                                                                                                                                                                                                                                                                                                       
      (Thread/sleep 100)                                                                                                                                                                                                                                                                                                      
      (cons (rand) (provider)))))                                                                                                                                                                                                                                                                                             

(def printer (agent nil))                                                                                                                                                                                                                                                                                                     
(defn log [& line]                                                                                                                                                                                                                                                                                                            
  (send-off printer (fn [x] (apply println line))))                                                                                                                                                                                                                                                                           

(def promises (atom (repeatedly promise)))                                                                                                                                                                                                                                                                                    

(defn client-connected-thread [x input]                                                                                                                                                                                                                                                                                       
  (log "Client connection " x " is connected with the provider and just received" @(first input))                                                                                                                                                                                                                             
  (recur x (rest input)))                                                                                                                                                                                                                                                                                                     

(.start (Thread. (fn []                                                                                                                                                                                                                                                                                                       
                   (loop [stream (provider)]                                                                                                                                                                                                                                                                                  
                     (when-let [item (first stream)]                                                                                                                                                                                                                                                                          
                       (log "I received " item", will share now")                                                                                                                                                                                                                                                             
                       (deliver (first @promises) item)                                                                                                                                                                                                                                                                       
                       (swap! promises rest))                                                                                                                                                                                                                                                                                 
                       (recur (rest stream))))))                                                                                                                                                                                                                                                                              


(Thread/sleep 300)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 1 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 100)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 2 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 50)                                                                                                                                                                                                                                                                                                             
(.start (Thread. #(client-connected-thread 3 @promises)))            

So, basically the question is: is this the right way to tackle this problem?

Also, we're talking about a streaming media server here, so the provider function will provide tens of thousands of items per second, and there may be 10 clients connected. Is the promise-system meant for such heavy use?

Это было полезно?

Решение

Clojure has agents for situations where you need to send information asynchronously which seems like a good match for your use case.

you are very close indeed, just stuck the work agents in a couple spots to finish the though.

"In straight Clojure, I think it would be easy. Whenever a client connects, add the connection to an vector of agents in an agent, when it disconnects remove the connection from the agent of agents, and whenever a new package from the webcam arrives send it to each agent in the agent."

make sure you use send-off instead of send to keep from emptying your thread pool.

This has a bunch of advantages over the "lock an array" approach:

  • one slow client wont stop you from adding or removing or adding connections
  • clients will eventually get all the frames with out having to track each separately
  • you don't have to worry about locking
  • you don't have to manually allocate threads
  • you can use watches and such to report performance with out changing the simple core of the algorithm.

a rough outline would look like:

user> (def connections-stub (range))
user> (def connections (agent []))
#'user/connections
user> (defn accept-connection [connection] 
    (send connections conj (agent connection)))
#'user/accept-connection
user> (map accept-connection (take 10 connections-stub))
(#<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]>)

user> (defn send-frame [con-agent frame] 
       (send con-agent 
         (fn [connection frame] 
           (println "sending " frame " to " connection) connection) frame))
#'user/send-frame

user> (send-frame (first @connections) "hello")
sending  hello  to  0
#<Agent@da69a9c: 0>

user> (defn dispatch-frame [frame] 
        (doall (map #(send-frame % frame) @connections)))
#'user/dispatch-frame

user> (dispatch-frame "hello")
sending  hello  to  0
sending  hello  to  1
sending  hello  to  2
sending  hello  to  3
sending  hello  to  4
sending  hello  to  5
sending  hello  to  6
sending  hello  to  7
sending  hello  to  8
sending  hello  to  9
(#<Agent@da69a9c: 0> #<Agent@34f07ec4: 1> #<Agent@11ee68d1: 2> #<Agent@3b237a89: 3> #<Agent@1641d6b4: 4> #<Agent@3c76ced6: 5> #<Agent@1c05629d: 6> #<Agent@258d3fca: 7> #<Agent@5c56fa08: 8> #<Agent@52395294: 9>)
user> 

Другие советы

Take a look at Aleph. This is a library for providing "async channels" which can help you to implement the required scenario.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top