Domanda

Sto usando rxjava nella mia app Android per gestire le richieste di rete in modo asincrono.Ora vorrei riprovare una richiesta di rete non riuscita solo dopo che è trascorso un certo tempo.

C'è un modo per usare retry () su un Osservabile ma per riprovare solo dopo un certo ritardo?

C'è un modo per far sapere all'Osservabile che è attualmente in fase di riprovazione (al contrario di provato per la prima volta)?

Ho dato un'occhiata a debounce()/throttleWithTimeout() ma sembrano fare qualcosa di diverso.

Modificare:

Penso di aver trovato un modo per farlo, ma sarei interessato a confermare che questo è il modo corretto per farlo o per altri modi migliori.

Quello che sto facendo è questo:Nel metodo call () del mio Osservabile.OnSubscribe, prima di chiamare il metodo Subscribers onError (), ho semplicemente lasciato che il thread dormisse per la quantità di tempo desiderata.Quindi, per riprovare ogni 1000 millisecondi, faccio qualcosa del genere:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

Poiché questo metodo è in esecuzione su un thread IO comunque non blocca l'interfaccia utente.L'unico problema che posso vedere è che anche il primo errore viene segnalato con delay quindi il ritardo è lì anche se non c'è retry().Mi piacerebbe che il ritardo non fosse applicato dopo un errore ma invece prima un tentativo (ma non prima del primo tentativo, ovviamente).

È stato utile?

Soluzione

È possibile utilizzare l'operatore retryWhen() per aggiungere la logica di riprova a qualsiasi osservabile.

La classe seguente contiene la logica riprova:

RXJAVA 2.X

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}
.

RXJAVA 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}
.

Uso:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));
.

Altri suggerimenti

ispirato a La risposta di Paul e se non sei interessato ai problemi retryWhen indicati da bhijit sarkar , il modo più semplice per ritardare la rivagnimento con rxjava2 unconditionnaly è:

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))
.

Potresti voler vedere altri campioni e spiegazioni su Riprova da parte e ripeti due volte .

Questo esempio funziona con JXJAVA 2.2.2:

Riprova senza indugio:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);
.

Riprova con ritardo:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));
.

Il nostro singolo sorgente non riesce se someconnection.Senda () fallisce. Quando ciò accade, l'osservabile dei fallimenti all'interno del riprottore, la dov'è emette l'errore. Ritardiamo quell'emissione di 300 ms e rimandiamo indietro per segnalare un tentativo. Prendere (5) garantisce che la nostra segnalazione osservabile terminerà dopo aver ricevuto cinque errori. Riprova da vede la terminazione e non si riprova dopo il quinto fallimento.

Questa è una soluzione basata sugli snippet di Ben Christensen che ho visto, RetryWhen Esempio, e Retrywhentestconditional (Ho dovuto cambiare n.getThrowable() per n perché funzioni).Ho usato evant / gradle-retrolambda per far funzionare la notazione lambda su Android, ma non è necessario utilizzare lambda (anche se è altamente raccomandato).Per il ritardo ho implementato il back-off esponenziale, ma puoi collegare qualsiasi logica di backoff che vuoi lì.Per completezza ho aggiunto il subscribeOn e observeOn operatore.Sto usando ReactiveX / RxAndroid per il AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

Invece di usare myrequestobservable.rembyry I Uso una funzione wrapper retryobservable (myrequestobservable, ritorceto, secondi) che restituisce un nuovo osservabile che maneggia l'indiretto per il ritardo così posso fare

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}
.

retryWhen è un operatore complicato, forse anche difettoso.Il documento ufficiale e almeno una risposta qui utilizzano range operatore, che fallirà se non ci sono tentativi da effettuare.Guarda il mio discussione con il membro di ReactiveX David Karnok.

Ho migliorato la risposta di kjones cambiando flatMap A concatMap e aggiungendo a RetryDelayStrategy classe. flatMap non conserva l'ordine di emissione while concatMap lo fa, il che è importante per i ritardi con back-off.IL RetryDelayStrategy, come indica il nome, consentiamo all'utente di scegliere tra varie modalità di generazione dei ritardi tra i tentativi, incluso il backoff.Il codice è disponibile su my GitHub completo dei seguenti casi di test:

  1. Riesce al primo tentativo (nessun tentativo)
  2. Fallisce dopo 1 tentativo
  3. Tenta di riprovare 3 volte ma riesce la seconda volta, quindi non riprova la terza volta
  4. Riesce al terzo tentativo

Vedere setRandomJokes metodo.

Ora con RXJAVA versione 1.0+ è possibile utilizzare Zipwith per ottenere Ritentare il ritardo.

Aggiunta di modifiche a KJONES Risposta.

Modificato

public class RetryWithDelay implements 
                            Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int MAX_RETRIES;
    private final int DELAY_DURATION;
    private final int START_RETRY;

    /**
     * Provide number of retries and seconds to be delayed between retry.
     *
     * @param maxRetries             Number of retries.
     * @param delayDurationInSeconds Seconds to be delays in each retry.
     */
    public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
        MAX_RETRIES = maxRetries;
        DELAY_DURATION = delayDurationInSeconds;
        START_RETRY = 1;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .delay(DELAY_DURATION, TimeUnit.SECONDS)
                .zipWith(Observable.range(START_RETRY, MAX_RETRIES), 
                         new Func2<Throwable, Integer, Integer>() {
                             @Override
                             public Integer call(Throwable throwable, Integer attempt) {
                                  return attempt;
                             }
                         });
    }
}
.

stessa risposta di kjones ma aggiornato alla versione più recente Per rxjava 2.x versione: ('io.reactivex.rxjava2: rxjava: 2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {

    private final int maxRetries;
    private final long retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
        return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Throwable throwable) throws Exception {
                if (++retryCount < maxRetries) {
                    // When this Observable calls onNext, the original
                    // Observable will be retried (i.e. re-subscribed).
                    return Flowable.timer(retryDelayMillis,
                            TimeUnit.MILLISECONDS);
                }

                // Max retries hit. Just pass the error along.
                return Flowable.error(throwable);
            }
        });
    }
}
.

Uso:

// Aggiungere la logica dei tentativi su esistenti osservabili. // riprova max di 3 volte con un ritardo di 2 secondi.

observable
    .retryWhen(new RetryWithDelay(3, 2000));
.

È possibile aggiungere un ritardo nell'osservabile restituito nell'operatore di riproveri

          /**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}
.

Puoi vedere più esempi qui. https://github.com/politrons/reactive

Basato su KJONES Risposta qui è la versione Kotlin di RXJAVA 2.x riprovare con un ritardo come estensione.Sostituire Observable per creare la stessa estensione per Flowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}
.

Allora solo usarlo su observable.retryWithDelay(3, 1000) osservabile

Semplicemente farlo come questo:

                  Observable.just("")
                            .delay(2, TimeUnit.SECONDS) //delay
                            .flatMap(new Func1<String, Observable<File>>() {
                                @Override
                                public Observable<File> call(String s) {
                                    L.from(TAG).d("postAvatar=");

                                    File file = PhotoPickUtil.getTempFile();
                                    if (file.length() <= 0) {
                                        throw new NullPointerException();
                                    }
                                    return Observable.just(file);
                                }
                            })
                            .retry(6)
                            .subscribe(new Action1<File>() {
                                @Override
                                public void call(File file) {
                                    postAvatar(file);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {

                                }
                            });
.

per la versione Kotlin & rxjava1

class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
    : Function1<Observable<out Throwable>, Observable<*>> {

    private val START_RETRY: Int = 1

    override fun invoke(observable: Observable<out Throwable>): Observable<*> {
        return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
            .zipWith(Observable.range(START_RETRY, MAX_RETRIES),
                object : Function2<Throwable, Int, Int> {
                    override fun invoke(throwable: Throwable, attempt: Int): Int {
                        return attempt
                    }
                })
    }
}
.

(Kotlin) I Poco Bit Migliorato Codice con Backoff Esponenziale e Applied Defense Emitting of Observable.range ():

    fun testOnRetryWithDelayExponentialBackoff() {
    val interval = 1
    val maxCount = 3
    val ai = AtomicInteger(1);
    val source = Observable.create<Unit> { emitter ->
        val attempt = ai.getAndIncrement()
        println("Subscribe ${attempt}")
        if (attempt >= maxCount) {
            emitter.onNext(Unit)
            emitter.onComplete()
        }
        emitter.onError(RuntimeException("Test $attempt"))
    }

    // Below implementation of "retryWhen" function, remove all "println()" for real code.
    val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
        throwableRx.doOnNext({ println("Error: $it") })
                .zipWith(Observable.range(1, maxCount)
                        .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
                        BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
                )
                .flatMap { pair ->
                    if (pair.second >= maxCount) {
                        Observable.error(pair.first)
                    } else {
                        val delay = interval * 2F.pow(pair.second)
                        println("retry delay: $delay")
                        Observable.timer(delay.toLong(), TimeUnit.SECONDS)
                    }
                }
    }

    //Code to print the result in terminal.
    sourceWithRetry
            .doOnComplete { println("Complete") }
            .doOnError({ println("Final Error: $it") })
            .blockingForEach { println("$it") }
}
.

Nel caso in cui è necessario stampare il conteggio dei tentativi, È possibile utilizzare l'esempio fornito nella pagina Wiki di Rxjava https://github.com/Reactivex / rxjava / wiki / errori-handling-operator

observable.retryWhen(errors ->
    // Count and increment the number of errors.
    errors.map(error -> 1).scan((i, j) -> i + j)  
       .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
       // Limit the maximum number of retries.
       .takeWhile(errorCount -> errorCount < retryCounts)   
       // Signal resubscribe event after some delay.
       .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top