Question

I use RxJava in Scala project and say I have this simple Observable:

Observable[String](observer => while (true) observer onNext "hi")
  .subscribe(v => println(v))

println("hello")

I will never get to "hello" message because while blocks a thread. How can I run my Observable in a separate thread to avoid blocking?

==================================

I was thinking observeOn could help but it does not. Running this:

val s = rx.lang.scala.schedulers.NewThreadScheduler.apply

Observable[String](observer => while (true) observer onNext "hi")
  .observeOn(s).subscribe(v => println(v))

println("hello")

...still does not print "hello". I guess adding observeOn makes OnNext to be called in a separate thread but not a while block itself?

==================================

I could of course wrap while in a Future:

Observable[String](observer => Future { while (true) observer onNext "hi" })
  .subscribe(v => println(v))

println("test") // "test" gets printed

But perhaps there are more rx-idiomatic ways of doing this exist?

Was it helpful?

Solution

You need to use subscribeOn. E.g.,

Observable[String](observer => while (true) observer onNext "hi").subscribeOn(s)
  .subscribe(v => println(v))

subscribeOn will call the subscribe function in the Scheduler, while observeOn will dispatch the messages to the Scheduler.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top