Frage

Ich verwende rxjava in meiner Android-App, um Netzwerkanfragen asynchron zu verarbeiten.Jetzt möchte ich eine fehlgeschlagene Netzwerkanfrage erst nach Ablauf einer bestimmten Zeit erneut versuchen.

Gibt es eine Möglichkeit, retry() für ein Observable zu verwenden, es aber erst nach einer bestimmten Verzögerung erneut zu versuchen?

Gibt es eine Möglichkeit, dem Observable mitzuteilen, dass es gerade erneut versucht wird (anstatt es zum ersten Mal zu versuchen)?

Ich habe mir debounce()/throttleWithTimeout() angesehen, aber sie scheinen etwas anderes zu tun.

Bearbeiten:

Ich glaube, ich habe einen Weg gefunden, dies zu tun, aber ich wäre an einer Bestätigung interessiert, dass dies der richtige Weg ist, oder an anderen, besseren Wegen.

Was ich mache, ist Folgendes:In der call()-Methode meines Observable.OnSubscribe lasse ich den Thread einfach für die gewünschte Zeitspanne ruhen, bevor ich die Subscribers onError()-Methode aufrufe.Um es also alle 1000 Millisekunden erneut zu versuchen, mache ich etwa Folgendes:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

Da diese Methode ohnehin auf einem IO-Thread läuft, blockiert sie die Benutzeroberfläche nicht.Das einzige Problem, das ich sehe, ist, dass selbst der erste Fehler mit Verzögerung gemeldet wird, sodass die Verzögerung auch dann vorhanden ist, wenn kein retry() erfolgt.Mir würde es besser gefallen, wenn die Verzögerung nicht angewendet würde nach ein Fehler, aber stattdessen Vor ein erneuter Versuch (aber natürlich nicht vor dem ersten Versuch).

War es hilfreich?

Lösung

Du kannst den ... benutzen retryWhen() Operator zum Hinzufügen einer Wiederholungslogik zu jedem Observable.

Die folgende Klasse enthält die Wiederholungslogik:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

Verwendung:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));

Andere Tipps

Inspiriert von Pauls Antwort, und wenn Sie sich nicht darum kümmern retryWhen Probleme angegeben von Abhijit Sarkar, Der einfachste Weg, das erneute Abonnement mit rxJava2 bedingungslos zu verzögern, ist:

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

Möglicherweise möchten Sie weitere Beispiele und Erklärungen sehen retryWhen und repeatWhen.

Dieses Beispiel funktioniert mit JXJAVA 2.2.2:

Versuch ohne Verzögerung:

generasacodicetagpre.

Versuch mit Verzögerung:

generasacodicetagpre.

Unsere Quelle Single fehlschlägt, wenn SomEconnection.send () fehlschlägt. Wenn dies geschieht, gibt die beobachtbaren Fehler in der Retry-Wiederholung den Fehler aus. Wir verzögern diese Emissionen um 300 ms und senden ihn zurück, um einen Wiederholenden zu senden. Nehmen (5) garantiert, dass unsere signalisierbare Beobachtungsabbildung nach fünf Fehlern enden wird. Retrywenners sieht die Kündigung und wiederholt nach dem fünften Versagen nicht.

Dies ist eine Lösung, die auf den Ausschnitten von Ben Christensen basiert, die ich gesehen habe: RetryWhen-Beispiel, Und RetryWhenTestsConditional (Ich musste mich ändern n.getThrowable() Zu n damit es funktioniert).ich benutzte evant/gradle-retrolambda Damit die Lambda-Notation auf Android funktioniert, müssen Sie jedoch keine Lambdas verwenden (obwohl dies dringend empfohlen wird).Für die Verzögerung habe ich einen exponentiellen Backoff implementiert, aber Sie können dort jede beliebige Backoff-Logik einfügen.Der Vollständigkeit halber habe ich das hinzugefügt subscribeOn Und observeOn Betreiber.Ich benutze ReactiveX/RxAndroid für die AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

Anstelle von myrequestobservable.retry verwende ich eine Wrapper-Funktion wiederholbar (myRequestObservable, RetryCount, Sekunden), in der ein neues beobachtbares Retrycount, Sekunden lang zurückging, der die Indirektion für die Verzögerung umgibt, damit ich dies tun kann

generasacodicetagpre.

retryWhen ist ein komplizierter, vielleicht sogar fehlerhafter Operator.Das offizielle Dokument und mindestens eine Antwort hier verwenden range Operator, der fehlschlägt, wenn keine Wiederholungsversuche durchgeführt werden müssen.Sieh mein Diskussion mit ReactiveX-Mitglied David Karnok.

Ich habe die Antwort von Kjones verbessert, indem ich sie geändert habe flatMap Zu concatMap und durch Hinzufügen von a RetryDelayStrategy Klasse. flatMap behält die Reihenfolge der Emission nicht bei concatMap Dies ist wichtig für Verzögerungen mit Backoff.Der RetryDelayStrategy, Wie der Name schon sagt, können Benutzer zwischen verschiedenen Modi zum Generieren von Wiederholungsverzögerungen wählen, einschließlich Backoff.Der Code ist auf meinem verfügbar GitHub komplett mit den folgenden Testfällen:

  1. Beim ersten Versuch erfolgreich (keine Wiederholungsversuche)
  2. Schlägt nach 1 Wiederholung fehl
  3. Versucht es dreimal erneut, ist aber beim zweiten Mal erfolgreich und versucht es daher nicht beim dritten Mal
  4. Erfolgreich beim dritten Wiederholungsversuch

Sehen setRandomJokes Methode.

Jetzt mit RXJAVA Version 1.0+ können Sie mit ZiPWith mit Verzögerung erneut versuchen.

Hinzufügen von Änderungen an kjones Antwort.

modifiziert

generasacodicetagpre.

gleiche Antwort als von kjones aber aktualisiert auf die neueste Version Für rxjava 2.x version: ('io.reactivex.rxjava2: rxjava: 2.1.3')

generasacodicetagpre.

uage:

// Hinzufügen von Wiederholungslogik auf vorhandene beobachtbare. // max von 3 mal mit einer Verzögerung von 2 Sekunden wiederholen.

generasacodicetagpre.

Sie können eine Verzögerung in der im Retry-Bediener zurückgegebenen Beamten hinzufügen. generasacodicetagpre.

Sie können hier weitere Beispiele sehen. https://github.com/politrons/reagive

Bezogen auf kjones Die Antwort hier ist die Kotlin-Version von RxJava 2.x mit einer Verzögerung als Erweiterung.Ersetzen Observable um dieselbe Erweiterung zu erstellen Flowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}

Dann verwenden Sie es einfach auf Observable observable.retryWithDelay(3, 1000)

Mach es einfach so:

generasacodicetagpre.

für kotlin & rxjava1 version

generasacodicetagpre.

(kotlin) Ich bin bisschen verbessert Code mit exponentiellen Backoff- und angewandten Verteidigungsausemitting von Samardable.Range ():

generasacodicetagpre.

im Ereignis, wenn Sie die Retry-Anzahl ausdrucken müssen, Sie können das in Rxjava-Wiki-Seite bereitgestellt, das in Rxjava-Wiki-Seite bereitgestellt wirdReactiveX / RXJava / Wiki / Error-Handling-Operatoren

generasacodicetagpre.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top