Question

In Clojure I can get overlapping partitions of a collection by tuning the step argument to partition:

(partition 3 1 (range 20))

;; ((0 1 2) (1 2 3) (2 3 4) (3 4 5) ...)

core.async does have a partition function but since it doesn't accept a step argument, I can't get overlapping partitions:

(let [c (chan)]
  (go (doseq [n (range 20)]
        (>! c n)))

  (go-loop [p (async/partition 3 c)]
    (when-let [v (<! p)]
      (prn v)
      (recur p))))

;;[0 1 2]
;;[3 4 5]
;;[6 7 8]

I realise having this would probably mean being able to read the same value from a channel more than once. I'm also aware that I could create my own function that reads as many values from a channel as I want and build my own partitions.

However I was wondering if there is any way I could achieve this with the core API provided by core.async.

PS. sliding-buffer doesn't do the trick as I can't peek at the whole buffer at once.

Was it helpful?

Solution 2

One way of doing this would be to create a function that reads from the channel, buffers the values and puts to a new channel. I'm not sure how idiomatic this is though.

For example, the function below will put! a vector into the output channel whenever the required n items have been read from the input channel, skipping step items after every output.

(defn stepped-partition [in n step]
  (let [out (chan)]
    (go-loop [buffer []]
      (when-let [v (<! in)]
        (let [new-buffer (conj buffer v)]
          (if (= (count new-buffer) n)
            (do 
              (put! out new-buffer)
              (recur (subvec new-buffer step)))
            (recur new-buffer)))))
   out))

(def original (chan))
(def partitioned (stepped-partition a 3 2))

(go-loop []
           (when-let [v (<! partitioned)]
             (println v)
             (recur)))

(async/onto-chan original [1 2 3 4 5 6 7 8 9])

;=> [1 2 3]
;=> [3 4 5]
;=> [5 6 7]
;=> [7 8 9]

OTHER TIPS

IMHO, I think that this desire

"being able to read the same value from a channel more than once"

Is contrary to the principles of core.async.

Each time you read a value from a channel you are taking out this value from a channel

So, the good thing about channel behaviour is that it guarantees one single read for each value, no read (blocking/parking thread) if no value, and nil if the channel is closed.

Then, the next question to start solving your problem should be: why (on core.async) there are at least 3 different functions to put/take values on/out a channel. So, thinking the communication channel as a rendezvous, there are (core.async) 3 different application/thread behaviour until the reader and the writer are available:

  • Blocking thread >!! <!! The running thread will be blocked until both, the reader, and the writer are available.
  • Parking thread (using go macro block) >! <! The go block will create a pseudo-thread that will be park until both, reader, and writer are available. This behaviour doesn't block your running thread.
  • Asynchronous behavior take! put! You are only guaranteed on the order of writes and reads
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top