我在 Scala 项目中使用 RxJava 并说我有这个简单的 Observable:

Observable[String](observer => while (true) observer onNext "hi")
  .subscribe(v => println(v))

println("hello")

我永远不会收到“你好”消息,因为 while 阻塞一个线程。我怎样才能运行我的 Observable 在单独的线程中以避免阻塞?

==================================

我刚在想 observeOn 可能有帮助,但事实并非如此。运行这个:

val s = rx.lang.scala.schedulers.NewThreadScheduler.apply

Observable[String](observer => while (true) observer onNext "hi")
  .observeOn(s).subscribe(v => println(v))

println("hello")

...仍然不打印“你好”。我想添加 observeOn 使 OnNext 在单独的线程中调用,但不是 while 阻止自己?

==================================

我当然可以包裹 while 在一个 Future:

Observable[String](observer => Future { while (true) observer onNext "hi" })
  .subscribe(v => println(v))

println("test") // "test" gets printed

但也许还有更多的 rx 惯用方法可以做到这一点?

有帮助吗?

解决方案

你需要使用 subscribeOn. 。例如。,

Observable[String](observer => while (true) observer onNext "hi").subscribeOn(s)
  .subscribe(v => println(v))

subscribeOn 将调用 subscribe 函数在 Scheduler, , 尽管 observeOn 将把消息发送到 Scheduler.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top