Frage

Ich benutze RxJava im Scala-Projekt und sage, ich habe das einfach Observable:

Observable[String](observer => while (true) observer onNext "hi")
  .subscribe(v => println(v))

println("hello")

Ich werde nie zu einer "Hallo" -Nachricht kommen, weil while blockiert einen Thread.Wie kann ich meine Observable in einem separaten Thread, um Blockaden zu vermeiden?

==================================

Ich dachte nach observeOn könnte helfen, tut es aber nicht.Dies ausführen:

val s = rx.lang.scala.schedulers.NewThreadScheduler.apply

Observable[String](observer => while (true) observer onNext "hi")
  .observeOn(s).subscribe(v => println(v))

println("hello")

...druckt immer noch nicht "Hallo".Ich denke, Hinzufügen observeOn machen OnNext in einem separaten Thread aufgerufen werden, aber nicht in einem while sich selbst blockieren?

==================================

Ich könnte natürlich wickeln while in einem Future:

Observable[String](observer => Future { while (true) observer onNext "hi" })
  .subscribe(v => println(v))

println("test") // "test" gets printed

Aber vielleicht gibt es mehr rx-idiomatische Möglichkeiten, dies zu tun?

War es hilfreich?

Lösung

Sie müssen verwenden subscribeOn.Z.B.,

Observable[String](observer => while (true) observer onNext "hi").subscribeOn(s)
  .subscribe(v => println(v))

subscribeOn wird die anrufen subscribe funktion in der Scheduler, während observeOn sendet die Nachrichten an die Scheduler.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top