Domanda

Ecco un codice Scala che attualmente ho:

Val B = Observable.Interval (1 secondo) .Map (n => if (n % 2 == 1) lancia una nuova eccezione else n*n)

b.subscribe (n => println (n), e => println ("errore"), () => println ("done"))

Ed ecco il mio risultato:

0
errore

Come posso modificare il mio Observable in modo che continui a funzionare dopo ogni errore e avrò un output come questo:

0
error
2
error
4
...
È stato utile?

Soluzione

È possibile utilizzare uno dei vari gestori di errore. Penso che nel tuo caso OnErrorflatmap potrebbe essere la scelta giusta:

wiki

javadoc .

Sfortunatamente, OnErrorflatmap non è (a partire dalla versione 0.19.0) parte dell'API Scala.

Altri suggerimenti

Avevo la stessa domanda e sono rimasto deluso dal fatto che non esistesse onErrorFlatMap per Scala Rx.Quindi ho preso una pugnalata all'implementazione di questa funzionalità da solo.

La mia soluzione è mostrata di seguito (vedi UNA SOLUZIONE).Il metodo chiave è questo:

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }

Dettagli del metodo di "recupero".

Il primo argomento per "recuperare" è l'osservabile che si desidera continuare a spremere per gli eventi anche dopo che ha lanciato un'eccezione.Ho provato ogni tipo di altri approcci, ma questo è stato l'unico che ha funzionato per me.Inizialmente mi aspettavo che OneRrerrorreturn di Scala Rx mappasse qualsiasi errore nel valore dettato dalla mia funzione di recupero, e poi continuavo ad andare avanti, ma mi mancava l'intero punto del "contratto osservabile", ovvero che un osservabile deve smettere di inviare qualsiasi altro eventi dopo oncelet o onerror.Qualsiasi osservabile che continua a vomitare eventi dopo un errore sarà etichettato "patologico" (e debitamente evitato dalla società educata), come discusso qui: https://github.com/ReactiveX/RxJava/wiki/Phantom-Operators#onerrorflatmap

Il carico utile di chiamate OnNext di successo è avvolto in un successo (), mentre qualsiasi eccezione sarà gestita da OneRorresumenext, che creerà un flusso osservabile concatenato da (1) un osservabile che avvolge l'errore e (2) un'istanza del bersaglio all'interno una chiamata ricorsiva per il recupero.Inizialmente ero preoccupato per la ricorsione infinita ..Ma tutto ha funzionato bene.

Limitazioni

Dovrei menzionare che nel caso della domanda del poster originale - che utilizza Observable.Interval, questo non funzionerebbe bene, poiché il recupero (target) sarebbe l'originale osservabile. non farebbe mai progressi.Per qualcosa come l'intervallo, dovresti scrivere il tuo intervallo basato sul timer che potrebbe essere riavviato.Si spera che il valore di eccezione ti dia abbastanza informazioni per dirti il ​​valore che devi riavviare.

A SOLUTION




object RecoverFromExceptionsInObservable extends App {

  import rx.lang.scala._
  import scala.language.postfixOps
  import scala.util.{Try, Failure, Success}


  val MILLISECS = 500L
  var tickCount = 0

  /**
   * There is a bug in this code which we will ignore for the simple purpose of
   * this test. The bug is that timers are never stopped and cleaned up.
   */
  def getTickObservable(): Observable[Int] = {
    @volatile var subscribers: Set[Observer[Int]] = Set.empty

    val t = new java.util.Timer()
    val task = new java.util.TimerTask {
      def run() = {
        subscribers.foreach(s => s.onNext(tickCount))
        tickCount += 1
      }
    }
    t.schedule(task, 0L, MILLISECS)

    Observable.create[Int] {
      (obs: Observer[Int]) => {
        subscribers = subscribers + obs
        Subscription {
          subscribers = subscribers - obs
        }
      }
    }
  }

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }


  val stream1 = getTickObservable() map { item =>
    if (item % 2 == 0) throw new RuntimeException(s"error on $item") else item
  }

  recover(stream1).subscribe(
    term => {
      println(s" ${Thread.currentThread().getName()}  onNext: $term")
    },
    t => {
      println("in error callback")
      println(s" ${Thread.currentThread().getName()}  onError: $t")
    },
    () => println(s" ${Thread.currentThread().getName()} subscriber complete")
  )
}

Ecco l'output parziale di un'esecuzione del codice precedente:

 Timer-0  onNext: Success(1)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 2)
 Timer-0  onNext: Success(3)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 4)
 Timer-0  onNext: Success(5)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 6)

Non volevo che questa risposta andasse avanti all'infinito, quindi ho saltato alcuni dettagli sugli approcci alternativi che ho adottato per risolvere questo problema, di cui puoi leggere Qui se siete interessati.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top