Вопрос

Я успешно настроил websocket в Play, используя его собственную конструкцию Enumerator, вызывая некоторый код, который возвращает строку:

def operationStatusFeed = WebSocket.using[String] { implicit request =>
  val in = Iteratee.ignore[String]
  val out = Enumerator.repeatM {
   Promise.timeout(operation, 3 seconds)
  }
  (in, out)
}

Теперь я хочу, чтобы мой operation функция для возврата rx.lang.scala.Observable[String] вместо строки, и я хочу вывести любую строку, как только она будет введена.Как я могу сопоставить это наблюдаемое с play.api.libs.iteratee.Enumerator?

Это было полезно?

Решение

Вы можете использовать неявное преобразование от Брайана Гилберта.Это будет работать совершенно нормально, но будьте осторожны при использовании обновленная версия обращений Брайана Гилберта !В ответе от Йеруна Крансена никогда не предлагается отказаться от подписки (и это плохо!).

  /*
   * Observable to Enumerator
   */
  implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
    // unicast create a channel where you can push data and returns an Enumerator
    Concurrent.unicast { channel =>
      val subscription = obs.subscribe(new ChannelObserver(channel))
      val onComplete = { () => subscription.unsubscribe }
      val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
      (onComplete, onError)
    }
  }

  class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
    override def onNext(elem: T): Unit = channel.push(elem)
    override def onCompleted(): Unit = channel.end()
    override def onError(e: Throwable): Unit = channel.end(e)
  }

Чтобы быть полным, вот преобразование из Enumerator в Observable :

  /*
   * Enumerator to Observable
   */
  implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
    // creating the Observable that we return
    Observable({ observer: Observer[T] =>
      // keeping a way to unsubscribe from the observable
      var cancelled = false

      // enumerator input is tested with this predicate
      // once cancelled is set to true, the enumerator will stop producing data
      val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)

      // applying iteratee on producer, passing data to the observable
      cancellableEnum (
        Iteratee.foreach(observer.onNext(_))
      ).onComplete { // passing completion or error to the observable
        case Success(_) => observer.onCompleted()
        case Failure(e) => observer.onError(e)
      }

      // unsubscription will change the var to stop the enumerator above via the breakE function
      new Subscription { override def unsubscribe() = { cancelled = true } }
    })
  }

Rx для WebSockets в игре

С другой стороны, вы можете заметить, что большую часть времени вы имеете дело с итерациями и перечислителями в Play, когда работаете с WebSockets (как вы делаете здесь).Мы все согласны с тем, что итерации действительно менее интуитивно понятны, чем наблюдаемые, и, вероятно, именно поэтому вы используете Rx в своем игровом проекте.

Исходя из этого наблюдения, я создал библиотеку под названием WidgetManager - менеджер виджетов это делает именно это :интеграция Rx в игру избавляет от повторяющихся манипуляций.

Используя эту библиотеку, ваш код мог бы просто быть :

def operationStatusFeed = WebSocket.using[String] { implicit request =>

  // you can optionally give a function to process data from the client (processClientData)
  // and a function to execute when connection is closed (onClientClose)
  val w = new WidgetManager()

  w.addObservable("op", operation)

  // subscribe to it and push data in the socket to the client (automatic JS callback called)
  w.subscribePush("op")

  // deals with Iteratees and Enumerators for you and returns what's needed
  w.webSocket
}

Библиотека находится на GitHub здесь : RxPlay (Материалы приветствуются)

Другие советы

Я сделал это решение, вдохновленное Брайан Гилберт :

class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
  override def onNext(arg: T): Unit = chan.push(arg)
  override def onCompleted(): Unit = chan.end()
  override def onError(e: Throwable): Unit = chan.end(e)
  override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
    def onCompleted() {
      chan.end()
    }

    def onError(e: Throwable) {
      chan.end(e)
    }

    def onNext(arg: T) {
      chan.push(arg)
    }
  }
}

implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
  Concurrent.unicast[T](onStart = { chan =>
      obs.subscribe(new ChannelObserver(chan))
  })
}
.

Неявная функция преобразует наблюдаемое для перечисленных средств без какого-либо дополнительного кода.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top