-
21-12-2019 - |
题
这是我当前的scala代码:
val b=可观察。Interval(1秒).map(n=> if(n%2== 1) 抛出新的例外Els N * N)
b .subscribe(n=> println(n),e=> println(“错误”),()=> println(“完成”))
和这是我的输出:
0
错误
如何修改我的观察到,以便在每次错误后继续进行,我将遇到这样的输出:
0
error
2
error
4
...
. 其他提示
我有同样的问题,很失望,没有onErrorflatmap for scala rx。所以我刺了一下 我自己实施这个功能。
我的解决方案如下所示(见解决方案)。关键方法是这个:
def recover[T](target: Observable[T]): Observable[Try[T]] = {
target.map { Success(_) }.
onErrorResumeNext(
(err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
)
}
.
“恢复”方法的详细信息
'恢复'的第一个参数是您希望继续挤压事件的可观察到 它抛出一个例外。我尝试了各种各样的方法,但这是唯一一个工作的方法 我。我最初预期Scala RX的OnErrorreturn将任何错误映射到我恢复的值中 功能,然后继续前进,但我错过了“可观察合同”的全部点,这是那个 可观察到的需要停止在符合符合件或onError之后发送任何进一步的事件。任何可观察到的 在错误后继续喷出事件,标记为“病理学”(并通过礼貌社会正式避开), 如这里所讨论的: https://github.com/reactivex/rxjava/wiki/phantom -operators #nonerrorflatmap
成功的onNext调用的有效载荷包装在成功()中包装,而任何例外将被处理 由OnErRrerSumeNext,它将从(1)可观察包装中产生连接的可观察流 错误,(2)在递归调用中包装的目标实例以恢复。我最初 担心无限递归..但这一切都熟悉了。
限制
我应该提到它在原始海报的问题的情况下 - 它使用观察到.Interval,这 恢复(目标)是最初的可观察力的,因为恢复(目标)是最初的。Interval,这将开始 从一开始就发出,所以你永远不会取得进步。对于像间隔一样的东西,你会 必须编写您自己的Timer基于定时器的间隔,可以重新启动。绝对值希望 为您提供足够的信息来告诉您您需要重新启动的值。
A SOLUTION
object RecoverFromExceptionsInObservable extends App {
import rx.lang.scala._
import scala.language.postfixOps
import scala.util.{Try, Failure, Success}
val MILLISECS = 500L
var tickCount = 0
/**
* There is a bug in this code which we will ignore for the simple purpose of
* this test. The bug is that timers are never stopped and cleaned up.
*/
def getTickObservable(): Observable[Int] = {
@volatile var subscribers: Set[Observer[Int]] = Set.empty
val t = new java.util.Timer()
val task = new java.util.TimerTask {
def run() = {
subscribers.foreach(s => s.onNext(tickCount))
tickCount += 1
}
}
t.schedule(task, 0L, MILLISECS)
Observable.create[Int] {
(obs: Observer[Int]) => {
subscribers = subscribers + obs
Subscription {
subscribers = subscribers - obs
}
}
}
}
def recover[T](target: Observable[T]): Observable[Try[T]] = {
target.map { Success(_) }.
onErrorResumeNext(
(err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
)
}
val stream1 = getTickObservable() map { item =>
if (item % 2 == 0) throw new RuntimeException(s"error on $item") else item
}
recover(stream1).subscribe(
term => {
println(s" ${Thread.currentThread().getName()} onNext: $term")
},
t => {
println("in error callback")
println(s" ${Thread.currentThread().getName()} onError: $t")
},
() => println(s" ${Thread.currentThread().getName()} subscriber complete")
)
}
.
以下是上面代码的运行的部分输出:
Timer-0 onNext: Success(1)
Timer-0 onNext: Failure(java.lang.RuntimeException: error on 2)
Timer-0 onNext: Success(3)
Timer-0 onNext: Failure(java.lang.RuntimeException: error on 4)
Timer-0 onNext: Success(5)
Timer-0 onNext: Failure(java.lang.RuntimeException: error on 6)
.
我不希望这个答案永远继续,所以我跳过一些关于我采取解决这个问题的替代方法的一些细节,你可以读到如果您有兴趣,请参阅。