Question

Voici un code Scala, j'ai actuellement:

val b= observable.Interval (1 seconde) .Map (n=> si (n% 2== 1) jeter une nouvelle exception sinon n * n)

b.subscribe (n=> println (n), e=> println ("erreur"), ()=> println ("fait"))

Et voici ma sortie:

0
Erreur

Comment puis-je modifier mon observable afin de continuer à aller après chaque erreur et je vais avoir une sortie comme celle-ci:

0
error
2
error
4
...

Était-ce utile?

La solution

Vous pouvez utiliser l'un des différents gestionnaires d'erreur. Je pense que dans votre cas, onerrorflatmap pourrait être le bon choix:

wiki

Javadoc

Malheureusement, onerrorflatMap n'est pas (à la version 0.19.0) de l'API Scala.

Autres conseils

J'ai eu cette même question et j'ai été déçu qu'il n'y ait pas d'onerrorflatmap pour Scala Rx. Alors j'ai pris un coup de poignard à la mise en œuvre de cette fonctionnalité moi-même.

Ma solution est indiquée ci-dessous (voir une solution). La méthode de la clé est celle-ci:

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }

détails de 'récupérer' méthode

Le premier argument à «Récupérer» est l'observable que vous souhaitez continuer à pressurer pour des événements même après Cela jette une exception. J'ai essayé toutes sortes d'autres approches, mais c'était le seul qui a travaillé pour moi. J'avais initialement attendu que Scala Rx est onerrorRorReturn pour cartographier une erreur dans la valeur dictée par ma récupération fonction, puis continuez, mais il manquait tout le point du "contrat observable", ce qui est que Un observable doit cesser d'empêcher d'envoyer des événements supplémentaires après l'encombrissement ou onerror. Tout observable que continue de veler les événements après qu'une erreur sera étiquetée «pathologique» (et dûment évitée par la société polie), Comme discuté ici: https://github.com/reactivex/rxjava/wiki/Proantom -Opérateurs # ONERRRIRROLFLATMAP

La charge utile des appels ONNEXT réussi sont enveloppés dans un succès (), tandis qu'une exception sera manipulée par onerrorrorresumenext, qui créera un flux observable concaténé de (1) un emballage observable L'erreur, et (2) une instance de la cible enveloppée dans un appel récursif à récupérer. J'avais initialement inquiet de la récursion infinie .. mais tout a fonctionné bien.

limitations

Je devrais mentionner que dans le cas de la question de l'affiche originale - qui utilise observable.Interval, cette ne fonctionnerait pas bien, car récupération (cible) serait l'observable d'origine.Interval, qui commencerait Émettre dès le début, vous ne feriez jamais progresser. Pour quelque chose comme un intervalle, vous seriez doivent écrire votre propre intervalle basé sur la minuterie qui pourrait être redémarré. La valeur d'exception pourrait espérer vous donner suffisamment d'informations pour vous indiquer la valeur dont vous avez besoin pour redémarrer.

A SOLUTION




object RecoverFromExceptionsInObservable extends App {

  import rx.lang.scala._
  import scala.language.postfixOps
  import scala.util.{Try, Failure, Success}


  val MILLISECS = 500L
  var tickCount = 0

  /**
   * There is a bug in this code which we will ignore for the simple purpose of
   * this test. The bug is that timers are never stopped and cleaned up.
   */
  def getTickObservable(): Observable[Int] = {
    @volatile var subscribers: Set[Observer[Int]] = Set.empty

    val t = new java.util.Timer()
    val task = new java.util.TimerTask {
      def run() = {
        subscribers.foreach(s => s.onNext(tickCount))
        tickCount += 1
      }
    }
    t.schedule(task, 0L, MILLISECS)

    Observable.create[Int] {
      (obs: Observer[Int]) => {
        subscribers = subscribers + obs
        Subscription {
          subscribers = subscribers - obs
        }
      }
    }
  }

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }


  val stream1 = getTickObservable() map { item =>
    if (item % 2 == 0) throw new RuntimeException(s"error on $item") else item
  }

  recover(stream1).subscribe(
    term => {
      println(s" ${Thread.currentThread().getName()}  onNext: $term")
    },
    t => {
      println("in error callback")
      println(s" ${Thread.currentThread().getName()}  onError: $t")
    },
    () => println(s" ${Thread.currentThread().getName()} subscriber complete")
  )
}

Voici une sortie partielle d'une course du code ci-dessus:

 Timer-0  onNext: Success(1)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 2)
 Timer-0  onNext: Success(3)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 4)
 Timer-0  onNext: Success(5)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 6)

Je ne voulais pas que cette réponse soit prise pour toujours, j'ai donc sauté sur certains détails sur des approches alternatives que j'ai prises pour résoudre ce problème, que vous pouvez lire sur ici si vous êtes intéressé.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top