Pregunta

He conseguido crear un websocket en Juego el uso de su nativa del Enumerador de la construcción, llamar a algún código que devuelve una Cadena:

def operationStatusFeed = WebSocket.using[String] { implicit request =>
  val in = Iteratee.ignore[String]
  val out = Enumerator.repeatM {
   Promise.timeout(operation, 3 seconds)
  }
  (in, out)
}

Ahora quiero que mi operation función para devolver un rx.lang.scala.Observable[String] en lugar de una Cadena, y quiero que a la salida de cualquier Cadena tan pronto como se entra.¿Cómo puedo asignar este Observables para un play.api.libs.iteratee.Enumerator?

¿Fue útil?

Solución

Puede usar la conversión implícita de Bryan Gilbert. Esto funcionará perfectamente bien, pero tenga cuidado de usar el Versión actualizada de las conversiones de Bryan Gilbert ! El cancelación de la suscripción nunca se llama en la respuesta de Jeroen Kransen (¡y eso es malo!).

  /*
   * Observable to Enumerator
   */
  implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
    // unicast create a channel where you can push data and returns an Enumerator
    Concurrent.unicast { channel =>
      val subscription = obs.subscribe(new ChannelObserver(channel))
      val onComplete = { () => subscription.unsubscribe }
      val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
      (onComplete, onError)
    }
  }

  class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
    override def onNext(elem: T): Unit = channel.push(elem)
    override def onCompleted(): Unit = channel.end()
    override def onError(e: Throwable): Unit = channel.end(e)
  }

Para estar completo, aquí está la conversión del enumerador a observable:

  /*
   * Enumerator to Observable
   */
  implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
    // creating the Observable that we return
    Observable({ observer: Observer[T] =>
      // keeping a way to unsubscribe from the observable
      var cancelled = false

      // enumerator input is tested with this predicate
      // once cancelled is set to true, the enumerator will stop producing data
      val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)

      // applying iteratee on producer, passing data to the observable
      cancellableEnum (
        Iteratee.foreach(observer.onNext(_))
      ).onComplete { // passing completion or error to the observable
        case Success(_) => observer.onCompleted()
        case Failure(e) => observer.onError(e)
      }

      // unsubscription will change the var to stop the enumerator above via the breakE function
      new Subscription { override def unsubscribe() = { cancelled = true } }
    })
  }


rx para websockets en juego

Por otro lado, puede observar que la mayoría de las veces que lidian con las titulaciones y los enumeradores en juego es cuando trabaja con WebSockets (como lo hace aquí). Todos estamos de acuerdo en que las Itherees son realmente menos intuitivas que los observables y esta es probablemente la razón por la que está utilizando Rx en su proyecto de juego.

De esa observación, he creado una biblioteca llamada WidgetManager que hace exactamente esto: la integración de Rx en juego se deshace de la manipulación de Itimeas.

Usando esa biblioteca, su código podría simplemente ser:

def operationStatusFeed = WebSocket.using[String] { implicit request =>

  // you can optionally give a function to process data from the client (processClientData)
  // and a function to execute when connection is closed (onClientClose)
  val w = new WidgetManager()

  w.addObservable("op", operation)

  // subscribe to it and push data in the socket to the client (automatic JS callback called)
  w.subscribePush("op")

  // deals with Iteratees and Enumerators for you and returns what's needed
  w.webSocket
}

La biblioteca está en GitHub aquí: rxplay (las contribuciones son bienvenidas)

Otros consejos

Hice esta solución inspirada en Brian Gilbert :

class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
  override def onNext(arg: T): Unit = chan.push(arg)
  override def onCompleted(): Unit = chan.end()
  override def onError(e: Throwable): Unit = chan.end(e)
  override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
    def onCompleted() {
      chan.end()
    }

    def onError(e: Throwable) {
      chan.end(e)
    }

    def onNext(arg: T) {
      chan.push(arg)
    }
  }
}

implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
  Concurrent.unicast[T](onStart = { chan =>
      obs.subscribe(new ChannelObserver(chan))
  })
}

La función implícita convierte a los observables a los enumeradores sin ningún código adicional.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top