这是我当前的scala代码:

val b=可观察。Interval(1秒).map(n=> if(n%2== 1) 抛出新的例外Els N * N)

b .subscribe(n=> println(n),e=> println(“错误”),()=> println(“完成”))

和这是我的输出:

0
错误

如何修改我的观察到,以便在每次错误后继续进行,我将遇到这样的输出:

0
error
2
error
4
...
.

有帮助吗?

解决方案

您可以使用各种错误处理程序之一。 我认为在你的情况下,onErrorflatmap可能是正确的选择:

wiki

javadoc

不幸的是,OnErrorFlatMap不是(如0.19.0)Scala API的一部分。

其他提示

我有同样的问题,很失望,没有onErrorflatmap for scala rx。所以我刺了一下 我自己实施这个功能。

我的解决方案如下所示(见解决方案)。关键方法是这个:

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }
.

“恢复”方法的详细信息

'恢复'的第一个参数是您希望继续挤压事件的可观察到 它抛出一个例外。我尝试了各种各样的方法,但这是唯一一个工作的方法 我。我最初预期Scala RX的OnErrorreturn将任何错误映射到我恢复的值中 功能,然后继续前进,但我错过了“可观察合同”的全部点,这是那个 可观察到的需要停止在符合符合件或onError之后发送任何进一步的事件。任何可观察到的 在错误后继续喷出事件,标记为“病理学”(并通过礼貌社会正式避开), 如这里所讨论的: https://github.com/reactivex/rxjava/wiki/phantom -operators #nonerrorflatmap

成功的onNext调用的有效载荷包装在成功()中包装,而任何例外将被处理 由OnErRrerSumeNext,它将从(1)可观察包装中产生连接的可观察流 错误,(2)在递归调用中包装的目标实例以恢复。我最初 担心无限递归..但这一切都熟悉了。

限制

我应该提到它在原始海报的问题的情况下 - 它使用观察到.Interval,这 恢复(目标)是最初的可观察力的,因为恢复(目标)是最初的。Interval,这将开始 从一开始就发出,所以你永远不会取得进步。对于像间隔一样的东西,你会 必须编写您自己的Timer基于定时器的间隔,可以重新启动。绝对值希望 为您提供足够的信息来告诉您您需要重新启动的值。

A SOLUTION




object RecoverFromExceptionsInObservable extends App {

  import rx.lang.scala._
  import scala.language.postfixOps
  import scala.util.{Try, Failure, Success}


  val MILLISECS = 500L
  var tickCount = 0

  /**
   * There is a bug in this code which we will ignore for the simple purpose of
   * this test. The bug is that timers are never stopped and cleaned up.
   */
  def getTickObservable(): Observable[Int] = {
    @volatile var subscribers: Set[Observer[Int]] = Set.empty

    val t = new java.util.Timer()
    val task = new java.util.TimerTask {
      def run() = {
        subscribers.foreach(s => s.onNext(tickCount))
        tickCount += 1
      }
    }
    t.schedule(task, 0L, MILLISECS)

    Observable.create[Int] {
      (obs: Observer[Int]) => {
        subscribers = subscribers + obs
        Subscription {
          subscribers = subscribers - obs
        }
      }
    }
  }

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }


  val stream1 = getTickObservable() map { item =>
    if (item % 2 == 0) throw new RuntimeException(s"error on $item") else item
  }

  recover(stream1).subscribe(
    term => {
      println(s" ${Thread.currentThread().getName()}  onNext: $term")
    },
    t => {
      println("in error callback")
      println(s" ${Thread.currentThread().getName()}  onError: $t")
    },
    () => println(s" ${Thread.currentThread().getName()} subscriber complete")
  )
}
.

以下是上面代码的运行的部分输出:

 Timer-0  onNext: Success(1)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 2)
 Timer-0  onNext: Success(3)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 4)
 Timer-0  onNext: Success(5)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 6)
.

我不希望这个答案永远继续,所以我跳过一些关于我采取解决这个问题的替代方法的一些细节,你可以读到如果您有兴趣,请参阅

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top