변환하는 방법 RX 관찰 가능한 재생하는 열거자
-
21-12-2019 - |
문제
나는 성공적으로 설정 websocket 플레이에서 사용하는 기본 열거자를 구축 일부를 호출하는 코드를 문자열을 반환합니다:
def operationStatusFeed = WebSocket.using[String] { implicit request =>
val in = Iteratee.ignore[String]
val out = Enumerator.repeatM {
Promise.timeout(operation, 3 seconds)
}
(in, out)
}
지금 내가 원하는 나 operation
을 반환하는 함수는 rx.lang.scala.Observable[String]
신의 문자열,내가 원하는 출력이 어떤 문자열로 들어갑니다.할 수 있는 방법 지도 이것을 관찰 play.api.libs.iteratee.Enumerator
?
해결책
당신이 사용할 수 있는 암시적으로 변환 브라이언에서 Gilbert.이 완벽하게 괜찮지만,주의해야 할 사용 업데이트된 버전의 브라이언 길버트의 변환 !Unsubscribe 은 결코 이라고 대답에서 Jeroen Kransen(그리고 그 나쁜!).
/*
* Observable to Enumerator
*/
implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
// unicast create a channel where you can push data and returns an Enumerator
Concurrent.unicast { channel =>
val subscription = obs.subscribe(new ChannelObserver(channel))
val onComplete = { () => subscription.unsubscribe }
val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
(onComplete, onError)
}
}
class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
override def onNext(elem: T): Unit = channel.push(elem)
override def onCompleted(): Unit = channel.end()
override def onError(e: Throwable): Unit = channel.end(e)
}
완전,여기에서 변환을 열거자를 관찰할 수:
/*
* Enumerator to Observable
*/
implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
// creating the Observable that we return
Observable({ observer: Observer[T] =>
// keeping a way to unsubscribe from the observable
var cancelled = false
// enumerator input is tested with this predicate
// once cancelled is set to true, the enumerator will stop producing data
val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)
// applying iteratee on producer, passing data to the observable
cancellableEnum (
Iteratee.foreach(observer.onNext(_))
).onComplete { // passing completion or error to the observable
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
// unsubscription will change the var to stop the enumerator above via the breakE function
new Subscription { override def unsubscribe() = { cancelled = true } }
})
}
Rx Websocket 에서 재생
다른 한편으로,당신은 말하는 대부분의 시간을 당신은 처리 Iteratees 및 열거자에서 재생할 때는 Websocket(으로 당신은 여기에서).우리는 모두가 동의하는 Iteratees 는 정말 작은 직관적 인을 관찰 가능하고 이는 이유는 아마도 당신은 당신을 사용하여 Rx 에서의 플레이 프로젝트입니다.
에서는 관찰,나를 구축 라이브러리 WidgetManager 는지 정확히 이:통합 Rx 플레이에서 제거하기 Iteratees 습니다.
를 사용하는 라이브러리 코드를 간단하게 수 있:
def operationStatusFeed = WebSocket.using[String] { implicit request =>
// you can optionally give a function to process data from the client (processClientData)
// and a function to execute when connection is closed (onClientClose)
val w = new WidgetManager()
w.addObservable("op", operation)
// subscribe to it and push data in the socket to the client (automatic JS callback called)
w.subscribePush("op")
// deals with Iteratees and Enumerators for you and returns what's needed
w.webSocket
}
라이브러리에 GitHub here: RxPlay (공헌을 환영합니다)
다른 팁
나는 이 솔루션에서 영감을 Brian 길버:
class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
override def onNext(arg: T): Unit = chan.push(arg)
override def onCompleted(): Unit = chan.end()
override def onError(e: Throwable): Unit = chan.end(e)
override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
def onCompleted() {
chan.end()
}
def onError(e: Throwable) {
chan.end(e)
}
def onNext(arg: T) {
chan.push(arg)
}
}
}
implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
Concurrent.unicast[T](onStart = { chan =>
obs.subscribe(new ChannelObserver(chan))
})
}
암시적 함수에 변환 관찰 가능하자가 없이 어떠한 추가 코드입니다.