Question

J'ai réussi à configurer un websocket dans Play en utilisant sa construction Enumerator native, en appelant du code qui renvoie une chaîne :

def operationStatusFeed = WebSocket.using[String] { implicit request =>
  val in = Iteratee.ignore[String]
  val out = Enumerator.repeatM {
   Promise.timeout(operation, 3 seconds)
  }
  (in, out)
}

Maintenant, je veux mon operation fonction pour renvoyer un rx.lang.scala.Observable[String] au lieu d'une chaîne, et je souhaite afficher n'importe quelle chaîne dès son entrée.Comment puis-je mapper cet observable à un play.api.libs.iteratee.Enumerator?

Était-ce utile?

La solution

Vous pouvez utiliser la conversion implicite de Bryan Gilbert.Cela fonctionnera parfaitement, mais veillez à utiliser le version mise à jour des conversions de Bryan Gilbert !La désinscription n'est jamais appelée dans la réponse de Jeroen Kransen (et c'est mauvais !).

  /*
   * Observable to Enumerator
   */
  implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
    // unicast create a channel where you can push data and returns an Enumerator
    Concurrent.unicast { channel =>
      val subscription = obs.subscribe(new ChannelObserver(channel))
      val onComplete = { () => subscription.unsubscribe }
      val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
      (onComplete, onError)
    }
  }

  class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
    override def onNext(elem: T): Unit = channel.push(elem)
    override def onCompleted(): Unit = channel.end()
    override def onError(e: Throwable): Unit = channel.end(e)
  }

Pour être complet, voici la conversion d'Enumerator en Observable :

  /*
   * Enumerator to Observable
   */
  implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
    // creating the Observable that we return
    Observable({ observer: Observer[T] =>
      // keeping a way to unsubscribe from the observable
      var cancelled = false

      // enumerator input is tested with this predicate
      // once cancelled is set to true, the enumerator will stop producing data
      val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)

      // applying iteratee on producer, passing data to the observable
      cancellableEnum (
        Iteratee.foreach(observer.onNext(_))
      ).onComplete { // passing completion or error to the observable
        case Success(_) => observer.onCompleted()
        case Failure(e) => observer.onError(e)
      }

      // unsubscription will change the var to stop the enumerator above via the breakE function
      new Subscription { override def unsubscribe() = { cancelled = true } }
    })
  }

Rx pour WebSockets en jeu

D'un autre côté, vous remarquerez peut-être que la plupart du temps que vous traitez avec des itérés et des énumérateurs en jeu, c'est lorsque vous travaillez avec des WebSockets (comme vous le faites ici).Nous sommes tous d'accord sur le fait que les itérés sont vraiment moins intuitifs que les observables et c'est probablement pourquoi vous utilisez Rx dans votre projet Play.

À partir de cette observation, j'ai construit une bibliothèque appelée Gestionnaire de widgets ça fait exactement ça :intégration de Rx dans Play éliminant la manipulation des itérés.

En utilisant cette bibliothèque, votre code pourrait simplement être :

def operationStatusFeed = WebSocket.using[String] { implicit request =>

  // you can optionally give a function to process data from the client (processClientData)
  // and a function to execute when connection is closed (onClientClose)
  val w = new WidgetManager()

  w.addObservable("op", operation)

  // subscribe to it and push data in the socket to the client (automatic JS callback called)
  w.subscribePush("op")

  // deals with Iteratees and Enumerators for you and returns what's needed
  w.webSocket
}

La bibliothèque est sur GitHub ici : RxPlay (Les contributions sont les bienvenues)

Autres conseils

J'ai fait cette solution inspirée par Brian Gilbert:

class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
  override def onNext(arg: T): Unit = chan.push(arg)
  override def onCompleted(): Unit = chan.end()
  override def onError(e: Throwable): Unit = chan.end(e)
  override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
    def onCompleted() {
      chan.end()
    }

    def onError(e: Throwable) {
      chan.end(e)
    }

    def onNext(arg: T) {
      chan.push(arg)
    }
  }
}

implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
  Concurrent.unicast[T](onStart = { chan =>
      obs.subscribe(new ChannelObserver(chan))
  })
}

La fonction implicite convertit les observables en énumérateurs sans aucun code supplémentaire.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top