Observable/toObservable
is a Java method, not a Clojure function. You can't treat Java methods as function values, like you can do with Clojure functions. comp
composes Clojure functions (objects that implement the IFn
interface). Solution: wrap the method call in a Clojure function.
One other thing:
(defn- collect [item]
(reset! collector (conj @collector item)))
should be:
(defn- collect [item]
(swap! collector conj item)))
Only use reset!
on an atom when you don't want to use the old value. When you do, use swap!
, else an atomic update is not guaranteed (another thread could update the atom after you read its value).