Domanda

i'm recently started playing with rxjava-scala, and I wanted to create a (possibly) infinite stream observable. looking at the code and open issues on github, i found out that an "out of the box" solution is unimplemented yet (usecase06 in the issue says it's not even implemented for java).

so, i tried to come up with my own implementation. consider the following:

def getIterator: Iterator[String] = {
  def fib(a: BigInt, b: BigInt): Stream[BigInt] = a #:: fib(b, a + b)
  fib(1, 1).iterator.map{bi =>
    Thread.sleep(100)
    s"next fibonacci: ${bi}"
  }
}

and a helper method:

def startOnThread(body: => Unit): Thread = {
  val t = new Thread {
    override def run = body
  }
  t.start
  t
}

and the example core:

val observable: Observable[String] = Observable(
  observer => {
    var cancelled = false
    val fs = getIterator
    val t = startOnThread{
      while (!cancelled) {observer.onNext(fs.next)}
      observer.onCompleted()
    }
    Subscription(new rx.Subscription {
      override def unsubscribe() = {
        cancelled = true
        t.join
      }
    })
  }
)

val observer = Observer(new rx.Observer[String]{
  def onNext(args: String) = println(args)
  def onError(e: Throwable) = logger.error(e.getMessage)
  def onCompleted() = println("DONE!")
})

val subscription = observable.subscribe(observer)
Thread.sleep(5000)
subscription.unsubscribe()

this seems to work fine, but i'm not happy with this. first of all, i'm creating a new Thread, which could be bad. but even if i use some kind of thread pool, it would still feel wrong. so i'm thinking i should use a scheduler, which sounds like a proper solution, only i can't figure out how to use it in such a scenario. i tried suppling rx.lang.scala.concurrency.Schedulers.threadPoolForIO in the observeOn method, but it seems like i'm doing it wrong. observable's code won't compile with it. any help would be greatly appreciate. thanks!

È stato utile?

Soluzione

First of all, there are already adapters to convert Iterable to Observable: "from" function.

Seconds, iterator wont return control, so your Sleep and unsubscribe wont be called. You need to execute subscription operation in a dedicated thread "subscribeOn(NewThreadScheduler())"

def getIterator: Iterator[String] = {
  def fib(a: BigInt, b: BigInt): Stream[BigInt] = a #:: fib(b, a + b)
  fib(1, 1).iterator.map{bi =>
    Thread.sleep(1000)
    s"next fibonacci: ${bi}"
  }
}

val sub = Observable.from(getIterator.toIterable)
  .subscribeOn(NewThreadScheduler())
  .subscribe(println(_))
readLine()
sub.unsubscribe()
println("fib complete")
readLine()
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top