Question

i'm recently started playing with rxjava-scala, and I wanted to create a (possibly) infinite stream observable. looking at the code and open issues on github, i found out that an "out of the box" solution is unimplemented yet (usecase06 in the issue says it's not even implemented for java).

so, i tried to come up with my own implementation. consider the following:

def getIterator: Iterator[String] = {
  def fib(a: BigInt, b: BigInt): Stream[BigInt] = a #:: fib(b, a + b)
  fib(1, 1).iterator.map{bi =>
    Thread.sleep(100)
    s"next fibonacci: ${bi}"
  }
}

and a helper method:

def startOnThread(body: => Unit): Thread = {
  val t = new Thread {
    override def run = body
  }
  t.start
  t
}

and the example core:

val observable: Observable[String] = Observable(
  observer => {
    var cancelled = false
    val fs = getIterator
    val t = startOnThread{
      while (!cancelled) {observer.onNext(fs.next)}
      observer.onCompleted()
    }
    Subscription(new rx.Subscription {
      override def unsubscribe() = {
        cancelled = true
        t.join
      }
    })
  }
)

val observer = Observer(new rx.Observer[String]{
  def onNext(args: String) = println(args)
  def onError(e: Throwable) = logger.error(e.getMessage)
  def onCompleted() = println("DONE!")
})

val subscription = observable.subscribe(observer)
Thread.sleep(5000)
subscription.unsubscribe()

this seems to work fine, but i'm not happy with this. first of all, i'm creating a new Thread, which could be bad. but even if i use some kind of thread pool, it would still feel wrong. so i'm thinking i should use a scheduler, which sounds like a proper solution, only i can't figure out how to use it in such a scenario. i tried suppling rx.lang.scala.concurrency.Schedulers.threadPoolForIO in the observeOn method, but it seems like i'm doing it wrong. observable's code won't compile with it. any help would be greatly appreciate. thanks!

Was it helpful?

Solution

First of all, there are already adapters to convert Iterable to Observable: "from" function.

Seconds, iterator wont return control, so your Sleep and unsubscribe wont be called. You need to execute subscription operation in a dedicated thread "subscribeOn(NewThreadScheduler())"

def getIterator: Iterator[String] = {
  def fib(a: BigInt, b: BigInt): Stream[BigInt] = a #:: fib(b, a + b)
  fib(1, 1).iterator.map{bi =>
    Thread.sleep(1000)
    s"next fibonacci: ${bi}"
  }
}

val sub = Observable.from(getIterator.toIterable)
  .subscribeOn(NewThreadScheduler())
  .subscribe(println(_))
readLine()
sub.unsubscribe()
println("fib complete")
readLine()
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top