Question

Here's a Scala code I currently have:

val b = Observable.interval(1 second).map(n => if (n % 2 == 1) throw new Exception else n*n)

b.subscribe(n => println(n), e => println("error"), () => println("done"))

And here's my output:

0
error

How can I modify my Observable so that it will just keep going after each error and I'll be having an output like this:

0
error
2
error
4
...
Was it helpful?

Solution

You can use one of the various error handlers. I think in your case onErrorFlatMap could be the right choice:

Wiki

JavaDoc

Unfortunately, onErrorFlatMap is not (as of version 0.19.0) part of the scala api.

OTHER TIPS

I had this same question, and was disappointed that there was no onErrorFlatMap for Scala Rx. So I took a stab at implementing this functionality myself.

My solution is shown below (see A SOLUTION). The key method is this one:

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }

Details of 'recover' method

The first argument to 'recover' is the observable that you want to continue squeezing for events even after it throws an Exception. I tried all manner of other approaches, but this was the only one that worked for me. I initially had expected Scala Rx's onErrorReturn to map any error into the value dictated by my recovery function, and then keep going, but I was missing the whole point of the 'Observable contract', which is that an Observable needs to stop sending any further events after onCompleted, or OnError. Any Observable that continues spewing events after an error will be labeled 'pathological' (and duly shunned by polite society), as discussed here: https://github.com/ReactiveX/RxJava/wiki/Phantom-Operators#onerrorflatmap

The payload of successful onNext calls are wrapped in a Success(), whereas any exception will be handled by onErrorResumeNext, which will create concatenated Observable stream from (1) an Observable wrapping the error, and (2) an instance of the target wrapped within a recursive call to recover. I initially worried about infinite recursion.. But it all worked out nicely.

Limitations

I should mention that in the case of the original poster's question -- which uses Observable.interval, this would not work well, since recover(target) would be the original Observable.interval, which would start emitting from the very beginning, so you would never make progress. For something like interval, you would have to write your own timer based interval that could be restarted. The exception value would hopefully give you enough information to tell you the value you need to restart from.

A SOLUTION




object RecoverFromExceptionsInObservable extends App {

  import rx.lang.scala._
  import scala.language.postfixOps
  import scala.util.{Try, Failure, Success}


  val MILLISECS = 500L
  var tickCount = 0

  /**
   * There is a bug in this code which we will ignore for the simple purpose of
   * this test. The bug is that timers are never stopped and cleaned up.
   */
  def getTickObservable(): Observable[Int] = {
    @volatile var subscribers: Set[Observer[Int]] = Set.empty

    val t = new java.util.Timer()
    val task = new java.util.TimerTask {
      def run() = {
        subscribers.foreach(s => s.onNext(tickCount))
        tickCount += 1
      }
    }
    t.schedule(task, 0L, MILLISECS)

    Observable.create[Int] {
      (obs: Observer[Int]) => {
        subscribers = subscribers + obs
        Subscription {
          subscribers = subscribers - obs
        }
      }
    }
  }

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }


  val stream1 = getTickObservable() map { item =>
    if (item % 2 == 0) throw new RuntimeException(s"error on $item") else item
  }

  recover(stream1).subscribe(
    term => {
      println(s" ${Thread.currentThread().getName()}  onNext: $term")
    },
    t => {
      println("in error callback")
      println(s" ${Thread.currentThread().getName()}  onError: $t")
    },
    () => println(s" ${Thread.currentThread().getName()} subscriber complete")
  )
}

Here is partial output of a run of the above code:

 Timer-0  onNext: Success(1)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 2)
 Timer-0  onNext: Success(3)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 4)
 Timer-0  onNext: Success(5)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 6)

I didn't want this answer to go on forever, so I skipped over some details on alternative approaches I took to solving this problem, which you can read about here if you are interested.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top