Pergunta

Eu estou usando rxjava no meu aplicativo Android para lidar com solicitações de rede de forma assíncrona.Agora eu gostaria de repetir uma falha na rede de solicitação somente depois de um certo tempo já passou.

Existe alguma maneira de usar o retry() em um Observável, mas para repetir somente depois de um certo atraso?

Existe uma maneira de deixar o Observável saber que está actualmente a ser repetida (como oposição a fechar pela primeira vez)?

Eu tinha um olhar de debounce()/throttleWithTimeout (), mas eles parecem estar fazendo algo diferente.

Editar:

Eu acho que eu encontrei uma maneira de fazê-lo, mas eu estaria interessado em qualquer confirmação de que esta é a maneira correta de fazê-lo ou para outros, melhores maneiras.

O que eu estou fazendo é esta:O método call() do meu Observáveis.OnSubscribe, antes que eu chame a Assinantes onError() o método, simplesmente deixei que a suspensão do Thread para o período de tempo pretendido.Por isso, repetir a cada 1000 milissegundos, faço algo parecido com isto:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

Desde que este método está sendo executado em um thread de e / s de qualquer maneira a não bloquear a UI.O único problema que eu posso ver é que até o primeiro erro é relatado com atraso, de forma que o atraso é de lá mesmo que não haja repetição().Eu gostaria que ele melhor se o atraso não foi aplicado depois de um erro, mas em vez disso antes de uma repetição (mas não antes da primeira tentativa, obviamente).

Foi útil?

Solução

Você pode usar o retryWhen() operador para adicionar lógica de repetição para qualquer Observável.

A classe a seguir contém a lógica de repetição:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

Uso:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));

Outras dicas

Inspirado por A resposta de paulo, e se você não estiver preocupado com retryWhen problemas indicados por Abhijit Sarkar, a maneira mais simples de atraso de nova inscrição com rxJava2 unconditionnaly é :

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

Você pode querer ver mais exemplos e explicações sobre retryWhen e repeatWhen.

Este exemplo funciona com jxjava 2.2.2:

Repetir sem demora:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);

Repetir com atraso:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));

A nossa fonte única falha se someConnection.send() falha.Quando isso acontece, o que é observável de falhas dentro retryWhen emite o erro.Nós atraso que a emissão de 300ms e enviá-lo de volta para o sinal de uma repetição.tomar(5) garante que a nossa sinalização observáveis vai terminar depois que nós recebemos cinco erros.retryWhen vê a rescisão de contrato e não de repetição após o quinto falha.

Esta é uma solução baseada em Ben Christensen snippets eu vi, RetryWhen Exemplo, e RetryWhenTestsConditional (Eu tive que mudar n.getThrowable() para n para que ele funcione).Eu usei relevantes/gradle-retrolambda para tornar a notação lambda trabalhar no Android, mas você não tem que usar lambdas (embora seja altamente recomendado).Para o atraso eu implementada a redução exponencial, mas você pode conectar o que sempre espera de lógica que você quer lá.Para a completude eu adicionei o subscribeOn e observeOn operadores.Eu estou usando ReactiveX/RxAndroid para o AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

em vez de usar MyRequestObservable.repetição de eu usar uma função de wrapper retryObservable(MyRequestObservable, retrycount, segundos) que retorne uma nova Observáveis que lidar com a indireção para o atraso, para que eu possa fazer

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}

retryWhen é complicado, talvez até mesmo de buggy, operador.O oficial de doc e pelo menos uma resposta aqui uso range o operador, que irá falhar se não houver tentativas a serem feitas.Ver o meu discussão com ReactiveX membro David Karnok.

Eu melhorado kjones' resposta alterando flatMap para concatMap e pela adição de uma RetryDelayStrategy de classe. flatMap não preserva a ordem de emissão, enquanto concatMap não, o que é importante para os atrasos, com back-off.O RetryDelayStrategy, como o nome indica, permitem que o usuário escolha entre os vários modos de gerar atraso de repetição, incluindo back-off.O código está disponível no meu GitHub complete com as seguintes casos de teste:

  1. Êxito na 1ª tentativa (sem repetições)
  2. Falha depois de 1 repetição
  3. Tenta repetir 3 vezes, mas consegue no dia 2, portanto, não de repetição 3ª vez
  4. Bem-sucedida em 3 de repetição

Ver setRandomJokes o método.

Agora com RxJava versão 1.0+ você pode usar zipWith para conseguir repetir com atraso.

A adição de modificações kjones respostas.

Modificado

public class RetryWithDelay implements 
                            Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int MAX_RETRIES;
    private final int DELAY_DURATION;
    private final int START_RETRY;

    /**
     * Provide number of retries and seconds to be delayed between retry.
     *
     * @param maxRetries             Number of retries.
     * @param delayDurationInSeconds Seconds to be delays in each retry.
     */
    public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
        MAX_RETRIES = maxRetries;
        DELAY_DURATION = delayDurationInSeconds;
        START_RETRY = 1;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .delay(DELAY_DURATION, TimeUnit.SECONDS)
                .zipWith(Observable.range(START_RETRY, MAX_RETRIES), 
                         new Func2<Throwable, Integer, Integer>() {
                             @Override
                             public Integer call(Throwable throwable, Integer attempt) {
                                  return attempt;
                             }
                         });
    }
}

Mesma resposta de kjones mas atualizado para a versão mais recente Para RxJava 2.x versão: ('io.reactivex.rxjava2:rxjava:2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {

    private final int maxRetries;
    private final long retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
        return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Throwable throwable) throws Exception {
                if (++retryCount < maxRetries) {
                    // When this Observable calls onNext, the original
                    // Observable will be retried (i.e. re-subscribed).
                    return Flowable.timer(retryDelayMillis,
                            TimeUnit.MILLISECONDS);
                }

                // Max retries hit. Just pass the error along.
                return Flowable.error(throwable);
            }
        });
    }
}

Uso:

// Adicionar lógica de repetição existentes observáveis.// Repetir, no máximo, 3 vezes com um atraso de 2 segundos.

observable
    .retryWhen(new RetryWithDelay(3, 2000));

Você pode adicionar um atraso no Observáveis devolvidos em retryWhen Operador

          /**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

Você pode ver mais exemplos aqui. https://github.com/politrons/reactive

Com base no kjones aqui a resposta é Kotlin versão do RxJava 2.x repetição com um atraso como uma extensão.Substituir Observable para criar a mesma extensão para Flowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}

Em seguida, basta usá-lo observáveis observable.retryWithDelay(3, 1000)

Basta fazer como esta:

                  Observable.just("")
                            .delay(2, TimeUnit.SECONDS) //delay
                            .flatMap(new Func1<String, Observable<File>>() {
                                @Override
                                public Observable<File> call(String s) {
                                    L.from(TAG).d("postAvatar=");

                                    File file = PhotoPickUtil.getTempFile();
                                    if (file.length() <= 0) {
                                        throw new NullPointerException();
                                    }
                                    return Observable.just(file);
                                }
                            })
                            .retry(6)
                            .subscribe(new Action1<File>() {
                                @Override
                                public void call(File file) {
                                    postAvatar(file);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {

                                }
                            });

Para Kotlin & RxJava1 versão

class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
    : Function1<Observable<out Throwable>, Observable<*>> {

    private val START_RETRY: Int = 1

    override fun invoke(observable: Observable<out Throwable>): Observable<*> {
        return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
            .zipWith(Observable.range(START_RETRY, MAX_RETRIES),
                object : Function2<Throwable, Int, Int> {
                    override fun invoke(throwable: Throwable, attempt: Int): Int {
                        return attempt
                    }
                })
    }
}

(Kotlin) eu pouco código melhorado com backoff exponencial e aplicado de defesa emissores de Observáveis.faixa (a):

    fun testOnRetryWithDelayExponentialBackoff() {
    val interval = 1
    val maxCount = 3
    val ai = AtomicInteger(1);
    val source = Observable.create<Unit> { emitter ->
        val attempt = ai.getAndIncrement()
        println("Subscribe ${attempt}")
        if (attempt >= maxCount) {
            emitter.onNext(Unit)
            emitter.onComplete()
        }
        emitter.onError(RuntimeException("Test $attempt"))
    }

    // Below implementation of "retryWhen" function, remove all "println()" for real code.
    val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
        throwableRx.doOnNext({ println("Error: $it") })
                .zipWith(Observable.range(1, maxCount)
                        .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
                        BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
                )
                .flatMap { pair ->
                    if (pair.second >= maxCount) {
                        Observable.error(pair.first)
                    } else {
                        val delay = interval * 2F.pow(pair.second)
                        println("retry delay: $delay")
                        Observable.timer(delay.toLong(), TimeUnit.SECONDS)
                    }
                }
    }

    //Code to print the result in terminal.
    sourceWithRetry
            .doOnComplete { println("Complete") }
            .doOnError({ println("Final Error: $it") })
            .blockingForEach { println("$it") }
}

no caso quando você precisa imprimir a contagem de repetição, você pode usar o exemplo fornecido no Rxjava da página wiki https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

observable.retryWhen(errors ->
    // Count and increment the number of errors.
    errors.map(error -> 1).scan((i, j) -> i + j)  
       .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
       // Limit the maximum number of retries.
       .takeWhile(errorCount -> errorCount < retryCounts)   
       // Signal resubscribe event after some delay.
       .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top