Frage

Ich habe in Play erfolgreich einen Websocket mithilfe seines nativen Enumerator-Konstrukts eingerichtet und dabei Code aufgerufen, der einen String zurückgibt:

def operationStatusFeed = WebSocket.using[String] { implicit request =>
  val in = Iteratee.ignore[String]
  val out = Enumerator.repeatM {
   Promise.timeout(operation, 3 seconds)
  }
  (in, out)
}

Jetzt will ich mein operation Funktion zur Rückgabe einer rx.lang.scala.Observable[String] anstelle eines Strings, und ich möchte jeden String ausgeben, sobald er eintritt.Wie kann ich dieses Observable einem zuordnen? play.api.libs.iteratee.Enumerator?

War es hilfreich?

Lösung

Sie können die implizite Konvertierung von Bryan Gilbert verwenden.Dies wird einwandfrei funktionieren, aber seien Sie vorsichtig bei der Verwendung aktualisierte Version der Konvertierungen von Bryan Gilbert !„Abbestellen“ wird in der Antwort von Jeroen Kransen nie genannt (und das ist schlecht!).

  /*
   * Observable to Enumerator
   */
  implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
    // unicast create a channel where you can push data and returns an Enumerator
    Concurrent.unicast { channel =>
      val subscription = obs.subscribe(new ChannelObserver(channel))
      val onComplete = { () => subscription.unsubscribe }
      val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
      (onComplete, onError)
    }
  }

  class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
    override def onNext(elem: T): Unit = channel.push(elem)
    override def onCompleted(): Unit = channel.end()
    override def onError(e: Throwable): Unit = channel.end(e)
  }

Der Vollständigkeit halber hier die Konvertierung von Enumerator zu Observable:

  /*
   * Enumerator to Observable
   */
  implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
    // creating the Observable that we return
    Observable({ observer: Observer[T] =>
      // keeping a way to unsubscribe from the observable
      var cancelled = false

      // enumerator input is tested with this predicate
      // once cancelled is set to true, the enumerator will stop producing data
      val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)

      // applying iteratee on producer, passing data to the observable
      cancellableEnum (
        Iteratee.foreach(observer.onNext(_))
      ).onComplete { // passing completion or error to the observable
        case Success(_) => observer.onCompleted()
        case Failure(e) => observer.onError(e)
      }

      // unsubscription will change the var to stop the enumerator above via the breakE function
      new Subscription { override def unsubscribe() = { cancelled = true } }
    })
  }

Rx für WebSockets in Play

Andererseits könnten Sie anmerken, dass Sie sich in Play hauptsächlich mit Iteratees und Enumeratoren befassen, wenn Sie mit WebSockets arbeiten (wie Sie es hier tun).Wir sind uns alle einig, dass Iteratees wirklich weniger intuitiv sind als Observables, und das ist wahrscheinlich der Grund, warum Sie Rx in Ihrem Play-Projekt verwenden.

Aus dieser Beobachtung heraus habe ich eine Bibliothek mit dem Namen erstellt WidgetManager das macht genau das:Integration von Rx in Play, um die Manipulation von Iteratees zu beseitigen.

Wenn Sie diese Bibliothek verwenden, könnte Ihr Code einfach wie folgt aussehen:

def operationStatusFeed = WebSocket.using[String] { implicit request =>

  // you can optionally give a function to process data from the client (processClientData)
  // and a function to execute when connection is closed (onClientClose)
  val w = new WidgetManager()

  w.addObservable("op", operation)

  // subscribe to it and push data in the socket to the client (automatic JS callback called)
  w.subscribePush("op")

  // deals with Iteratees and Enumerators for you and returns what's needed
  w.webSocket
}

Die Bibliothek finden Sie hier auf GitHub: RxPlay (Beiträge sind willkommen)

Andere Tipps

Ich habe diese Lösung inspiriert von Brian Gilbert:

class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
  override def onNext(arg: T): Unit = chan.push(arg)
  override def onCompleted(): Unit = chan.end()
  override def onError(e: Throwable): Unit = chan.end(e)
  override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
    def onCompleted() {
      chan.end()
    }

    def onError(e: Throwable) {
      chan.end(e)
    }

    def onNext(arg: T) {
      chan.push(arg)
    }
  }
}

implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
  Concurrent.unicast[T](onStart = { chan =>
      obs.subscribe(new ChannelObserver(chan))
  })
}

Die implizite Funktion konvertiert Observables ohne zusätzlichen Code in Enumeratoren.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top