문제

나는 성공적으로 설정 websocket 플레이에서 사용하는 기본 열거자를 구축 일부를 호출하는 코드를 문자열을 반환합니다:

def operationStatusFeed = WebSocket.using[String] { implicit request =>
  val in = Iteratee.ignore[String]
  val out = Enumerator.repeatM {
   Promise.timeout(operation, 3 seconds)
  }
  (in, out)
}

지금 내가 원하는 나 operation 을 반환하는 함수는 rx.lang.scala.Observable[String] 신의 문자열,내가 원하는 출력이 어떤 문자열로 들어갑니다.할 수 있는 방법 지도 이것을 관찰 play.api.libs.iteratee.Enumerator?

도움이 되었습니까?

해결책

당신이 사용할 수 있는 암시적으로 변환 브라이언에서 Gilbert.이 완벽하게 괜찮지만,주의해야 할 사용 업데이트된 버전의 브라이언 길버트의 변환 !Unsubscribe 은 결코 이라고 대답에서 Jeroen Kransen(그리고 그 나쁜!).

  /*
   * Observable to Enumerator
   */
  implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
    // unicast create a channel where you can push data and returns an Enumerator
    Concurrent.unicast { channel =>
      val subscription = obs.subscribe(new ChannelObserver(channel))
      val onComplete = { () => subscription.unsubscribe }
      val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
      (onComplete, onError)
    }
  }

  class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
    override def onNext(elem: T): Unit = channel.push(elem)
    override def onCompleted(): Unit = channel.end()
    override def onError(e: Throwable): Unit = channel.end(e)
  }

완전,여기에서 변환을 열거자를 관찰할 수:

  /*
   * Enumerator to Observable
   */
  implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
    // creating the Observable that we return
    Observable({ observer: Observer[T] =>
      // keeping a way to unsubscribe from the observable
      var cancelled = false

      // enumerator input is tested with this predicate
      // once cancelled is set to true, the enumerator will stop producing data
      val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)

      // applying iteratee on producer, passing data to the observable
      cancellableEnum (
        Iteratee.foreach(observer.onNext(_))
      ).onComplete { // passing completion or error to the observable
        case Success(_) => observer.onCompleted()
        case Failure(e) => observer.onError(e)
      }

      // unsubscription will change the var to stop the enumerator above via the breakE function
      new Subscription { override def unsubscribe() = { cancelled = true } }
    })
  }

Rx Websocket 에서 재생

다른 한편으로,당신은 말하는 대부분의 시간을 당신은 처리 Iteratees 및 열거자에서 재생할 때는 Websocket(으로 당신은 여기에서).우리는 모두가 동의하는 Iteratees 는 정말 작은 직관적 인을 관찰 가능하고 이는 이유는 아마도 당신은 당신을 사용하여 Rx 에서의 플레이 프로젝트입니다.

에서는 관찰,나를 구축 라이브러리 WidgetManager 는지 정확히 이:통합 Rx 플레이에서 제거하기 Iteratees 습니다.

를 사용하는 라이브러리 코드를 간단하게 수 있:

def operationStatusFeed = WebSocket.using[String] { implicit request =>

  // you can optionally give a function to process data from the client (processClientData)
  // and a function to execute when connection is closed (onClientClose)
  val w = new WidgetManager()

  w.addObservable("op", operation)

  // subscribe to it and push data in the socket to the client (automatic JS callback called)
  w.subscribePush("op")

  // deals with Iteratees and Enumerators for you and returns what's needed
  w.webSocket
}

라이브러리에 GitHub here: RxPlay (공헌을 환영합니다)

다른 팁

나는 이 솔루션에서 영감을 Brian 길버:

class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
  override def onNext(arg: T): Unit = chan.push(arg)
  override def onCompleted(): Unit = chan.end()
  override def onError(e: Throwable): Unit = chan.end(e)
  override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
    def onCompleted() {
      chan.end()
    }

    def onError(e: Throwable) {
      chan.end(e)
    }

    def onNext(arg: T) {
      chan.push(arg)
    }
  }
}

implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
  Concurrent.unicast[T](onStart = { chan =>
      obs.subscribe(new ChannelObserver(chan))
  })
}

암시적 함수에 변환 관찰 가능하자가 없이 어떠한 추가 코드입니다.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top