So konvertieren Sie RX Observable in Play Enumerator
-
21-12-2019 - |
Frage
Ich habe in Play erfolgreich einen Websocket mithilfe seines nativen Enumerator-Konstrukts eingerichtet und dabei Code aufgerufen, der einen String zurückgibt:
def operationStatusFeed = WebSocket.using[String] { implicit request =>
val in = Iteratee.ignore[String]
val out = Enumerator.repeatM {
Promise.timeout(operation, 3 seconds)
}
(in, out)
}
Jetzt will ich mein operation
Funktion zur Rückgabe einer rx.lang.scala.Observable[String]
anstelle eines Strings, und ich möchte jeden String ausgeben, sobald er eintritt.Wie kann ich dieses Observable einem zuordnen? play.api.libs.iteratee.Enumerator
?
Lösung
Sie können die implizite Konvertierung von Bryan Gilbert verwenden.Dies wird einwandfrei funktionieren, aber seien Sie vorsichtig bei der Verwendung aktualisierte Version der Konvertierungen von Bryan Gilbert !„Abbestellen“ wird in der Antwort von Jeroen Kransen nie genannt (und das ist schlecht!).
/*
* Observable to Enumerator
*/
implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
// unicast create a channel where you can push data and returns an Enumerator
Concurrent.unicast { channel =>
val subscription = obs.subscribe(new ChannelObserver(channel))
val onComplete = { () => subscription.unsubscribe }
val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
(onComplete, onError)
}
}
class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
override def onNext(elem: T): Unit = channel.push(elem)
override def onCompleted(): Unit = channel.end()
override def onError(e: Throwable): Unit = channel.end(e)
}
Der Vollständigkeit halber hier die Konvertierung von Enumerator zu Observable:
/*
* Enumerator to Observable
*/
implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
// creating the Observable that we return
Observable({ observer: Observer[T] =>
// keeping a way to unsubscribe from the observable
var cancelled = false
// enumerator input is tested with this predicate
// once cancelled is set to true, the enumerator will stop producing data
val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)
// applying iteratee on producer, passing data to the observable
cancellableEnum (
Iteratee.foreach(observer.onNext(_))
).onComplete { // passing completion or error to the observable
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
// unsubscription will change the var to stop the enumerator above via the breakE function
new Subscription { override def unsubscribe() = { cancelled = true } }
})
}
Rx für WebSockets in Play
Andererseits könnten Sie anmerken, dass Sie sich in Play hauptsächlich mit Iteratees und Enumeratoren befassen, wenn Sie mit WebSockets arbeiten (wie Sie es hier tun).Wir sind uns alle einig, dass Iteratees wirklich weniger intuitiv sind als Observables, und das ist wahrscheinlich der Grund, warum Sie Rx in Ihrem Play-Projekt verwenden.
Aus dieser Beobachtung heraus habe ich eine Bibliothek mit dem Namen erstellt WidgetManager das macht genau das:Integration von Rx in Play, um die Manipulation von Iteratees zu beseitigen.
Wenn Sie diese Bibliothek verwenden, könnte Ihr Code einfach wie folgt aussehen:
def operationStatusFeed = WebSocket.using[String] { implicit request =>
// you can optionally give a function to process data from the client (processClientData)
// and a function to execute when connection is closed (onClientClose)
val w = new WidgetManager()
w.addObservable("op", operation)
// subscribe to it and push data in the socket to the client (automatic JS callback called)
w.subscribePush("op")
// deals with Iteratees and Enumerators for you and returns what's needed
w.webSocket
}
Die Bibliothek finden Sie hier auf GitHub: RxPlay (Beiträge sind willkommen)
Andere Tipps
Ich habe diese Lösung inspiriert von Brian Gilbert:
class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
override def onNext(arg: T): Unit = chan.push(arg)
override def onCompleted(): Unit = chan.end()
override def onError(e: Throwable): Unit = chan.end(e)
override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
def onCompleted() {
chan.end()
}
def onError(e: Throwable) {
chan.end(e)
}
def onNext(arg: T) {
chan.push(arg)
}
}
}
implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
Concurrent.unicast[T](onStart = { chan =>
obs.subscribe(new ChannelObserver(chan))
})
}
Die implizite Funktion konvertiert Observables ohne zusätzlichen Code in Enumeratoren.