Cómo convertir RX Observables para Jugar Enumerador
-
21-12-2019 - |
Pregunta
He conseguido crear un websocket en Juego el uso de su nativa del Enumerador de la construcción, llamar a algún código que devuelve una Cadena:
def operationStatusFeed = WebSocket.using[String] { implicit request =>
val in = Iteratee.ignore[String]
val out = Enumerator.repeatM {
Promise.timeout(operation, 3 seconds)
}
(in, out)
}
Ahora quiero que mi operation
función para devolver un rx.lang.scala.Observable[String]
en lugar de una Cadena, y quiero que a la salida de cualquier Cadena tan pronto como se entra.¿Cómo puedo asignar este Observables para un play.api.libs.iteratee.Enumerator
?
Solución
Puede usar la conversión implícita de Bryan Gilbert. Esto funcionará perfectamente bien, pero tenga cuidado de usar el Versión actualizada de las conversiones de Bryan Gilbert ! El cancelación de la suscripción nunca se llama en la respuesta de Jeroen Kransen (¡y eso es malo!).
/*
* Observable to Enumerator
*/
implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
// unicast create a channel where you can push data and returns an Enumerator
Concurrent.unicast { channel =>
val subscription = obs.subscribe(new ChannelObserver(channel))
val onComplete = { () => subscription.unsubscribe }
val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
(onComplete, onError)
}
}
class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
override def onNext(elem: T): Unit = channel.push(elem)
override def onCompleted(): Unit = channel.end()
override def onError(e: Throwable): Unit = channel.end(e)
}
Para estar completo, aquí está la conversión del enumerador a observable:
/*
* Enumerator to Observable
*/
implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
// creating the Observable that we return
Observable({ observer: Observer[T] =>
// keeping a way to unsubscribe from the observable
var cancelled = false
// enumerator input is tested with this predicate
// once cancelled is set to true, the enumerator will stop producing data
val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)
// applying iteratee on producer, passing data to the observable
cancellableEnum (
Iteratee.foreach(observer.onNext(_))
).onComplete { // passing completion or error to the observable
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
// unsubscription will change the var to stop the enumerator above via the breakE function
new Subscription { override def unsubscribe() = { cancelled = true } }
})
}
rx para websockets en juego
Por otro lado, puede observar que la mayoría de las veces que lidian con las titulaciones y los enumeradores en juego es cuando trabaja con WebSockets (como lo hace aquí). Todos estamos de acuerdo en que las Itherees son realmente menos intuitivas que los observables y esta es probablemente la razón por la que está utilizando Rx en su proyecto de juego.
De esa observación, he creado una biblioteca llamada WidgetManager que hace exactamente esto: la integración de Rx en juego se deshace de la manipulación de Itimeas.
Usando esa biblioteca, su código podría simplemente ser:
def operationStatusFeed = WebSocket.using[String] { implicit request =>
// you can optionally give a function to process data from the client (processClientData)
// and a function to execute when connection is closed (onClientClose)
val w = new WidgetManager()
w.addObservable("op", operation)
// subscribe to it and push data in the socket to the client (automatic JS callback called)
w.subscribePush("op")
// deals with Iteratees and Enumerators for you and returns what's needed
w.webSocket
}
La biblioteca está en GitHub aquí: rxplay (las contribuciones son bienvenidas)
Otros consejos
Hice esta solución inspirada en Brian Gilbert :
class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
override def onNext(arg: T): Unit = chan.push(arg)
override def onCompleted(): Unit = chan.end()
override def onError(e: Throwable): Unit = chan.end(e)
override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
def onCompleted() {
chan.end()
}
def onError(e: Throwable) {
chan.end(e)
}
def onNext(arg: T) {
chan.push(arg)
}
}
}
implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
Concurrent.unicast[T](onStart = { chan =>
obs.subscribe(new ChannelObserver(chan))
})
}
La función implícita convierte a los observables a los enumeradores sin ningún código adicional.