Non interrompere Observable in caso di errore
-
21-12-2019 - |
Domanda
Ecco un codice Scala che attualmente ho:
Val B = Observable.Interval (1 secondo) .Map (n => if (n % 2 == 1) lancia una nuova eccezione else n*n)
b.subscribe (n => println (n), e => println ("errore"), () => println ("done"))
Ed ecco il mio risultato:
0
errore
Come posso modificare il mio Observable in modo che continui a funzionare dopo ogni errore e avrò un output come questo:
0
error
2
error
4
...
Altri suggerimenti
Avevo la stessa domanda e sono rimasto deluso dal fatto che non esistesse onErrorFlatMap per Scala Rx.Quindi ho preso una pugnalata all'implementazione di questa funzionalità da solo.
La mia soluzione è mostrata di seguito (vedi UNA SOLUZIONE).Il metodo chiave è questo:
def recover[T](target: Observable[T]): Observable[Try[T]] = {
target.map { Success(_) }.
onErrorResumeNext(
(err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
)
}
Dettagli del metodo di "recupero".
Il primo argomento per "recuperare" è l'osservabile che si desidera continuare a spremere per gli eventi anche dopo che ha lanciato un'eccezione.Ho provato ogni tipo di altri approcci, ma questo è stato l'unico che ha funzionato per me.Inizialmente mi aspettavo che OneRrerrorreturn di Scala Rx mappasse qualsiasi errore nel valore dettato dalla mia funzione di recupero, e poi continuavo ad andare avanti, ma mi mancava l'intero punto del "contratto osservabile", ovvero che un osservabile deve smettere di inviare qualsiasi altro eventi dopo oncelet o onerror.Qualsiasi osservabile che continua a vomitare eventi dopo un errore sarà etichettato "patologico" (e debitamente evitato dalla società educata), come discusso qui: https://github.com/ReactiveX/RxJava/wiki/Phantom-Operators#onerrorflatmap
Il carico utile di chiamate OnNext di successo è avvolto in un successo (), mentre qualsiasi eccezione sarà gestita da OneRorresumenext, che creerà un flusso osservabile concatenato da (1) un osservabile che avvolge l'errore e (2) un'istanza del bersaglio all'interno una chiamata ricorsiva per il recupero.Inizialmente ero preoccupato per la ricorsione infinita ..Ma tutto ha funzionato bene.
Limitazioni
Dovrei menzionare che nel caso della domanda del poster originale - che utilizza Observable.Interval, questo non funzionerebbe bene, poiché il recupero (target) sarebbe l'originale osservabile. non farebbe mai progressi.Per qualcosa come l'intervallo, dovresti scrivere il tuo intervallo basato sul timer che potrebbe essere riavviato.Si spera che il valore di eccezione ti dia abbastanza informazioni per dirti il valore che devi riavviare.
A SOLUTION
object RecoverFromExceptionsInObservable extends App {
import rx.lang.scala._
import scala.language.postfixOps
import scala.util.{Try, Failure, Success}
val MILLISECS = 500L
var tickCount = 0
/**
* There is a bug in this code which we will ignore for the simple purpose of
* this test. The bug is that timers are never stopped and cleaned up.
*/
def getTickObservable(): Observable[Int] = {
@volatile var subscribers: Set[Observer[Int]] = Set.empty
val t = new java.util.Timer()
val task = new java.util.TimerTask {
def run() = {
subscribers.foreach(s => s.onNext(tickCount))
tickCount += 1
}
}
t.schedule(task, 0L, MILLISECS)
Observable.create[Int] {
(obs: Observer[Int]) => {
subscribers = subscribers + obs
Subscription {
subscribers = subscribers - obs
}
}
}
}
def recover[T](target: Observable[T]): Observable[Try[T]] = {
target.map { Success(_) }.
onErrorResumeNext(
(err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
)
}
val stream1 = getTickObservable() map { item =>
if (item % 2 == 0) throw new RuntimeException(s"error on $item") else item
}
recover(stream1).subscribe(
term => {
println(s" ${Thread.currentThread().getName()} onNext: $term")
},
t => {
println("in error callback")
println(s" ${Thread.currentThread().getName()} onError: $t")
},
() => println(s" ${Thread.currentThread().getName()} subscriber complete")
)
}
Ecco l'output parziale di un'esecuzione del codice precedente:
Timer-0 onNext: Success(1)
Timer-0 onNext: Failure(java.lang.RuntimeException: error on 2)
Timer-0 onNext: Success(3)
Timer-0 onNext: Failure(java.lang.RuntimeException: error on 4)
Timer-0 onNext: Success(5)
Timer-0 onNext: Failure(java.lang.RuntimeException: error on 6)
Non volevo che questa risposta andasse avanti all'infinito, quindi ho saltato alcuni dettagli sugli approcci alternativi che ho adottato per risolvere questo problema, di cui puoi leggere Qui se siete interessati.