Question

I'm looking to wrap authenticated calls in my Java app with an Observable which will re-authorize my user if necessary before making its request. I've done something similar with ReactiveCocoa in Objective-C with this approach from the Shape Code Blog:

- (RACSignal *)doRequestAndRefreshTokenIfNecessary:(RACSignal *)requestSignal {
    return [requestSignal catch:^(NSError *error) {
        // Catch the error, refresh the token, and then do the request again.
        BOOL hasRefreshToken = [UserManager sharedInstance].refreshToken != nil;
        BOOL httpCode401AccessDenied = error.code == -1011;
        if (httpCode401AccessDenied && hasRefreshToken) {
            NSLog(@"Will attempt to refresh access token.");
            return [[[self refreshToken] ignoreValues] concat:requestSignal];
        }
        return requestSignal;
    }];
}

A RACSignal is the ReactiveCocoa analog to an Observable. This method creates a RACSignal which will catch any errors signaled by the initial request, and if that error matches the given criteria (in this case, access denied and there is an auth token available) a new RACSignal is transparently fired to use the refresh token and then re-send the original signal.

Does RxJava provide similar facilities? I couldn't find anything similar to catch: on the Combining Observables documentation.

Was it helpful?

Solution

It doesn't look like anything pre-exists within RxJava, but this seems to do the trick okay (a pretty direct translation from ReactiveCocoa's catch):

public class CatchObservable  {
    public interface CaughtErrorHandler {
        public Observable<?> onError(Throwable throwable);
    }

    public static <T> Observable catchObserver(final Observable<T> observable, final CaughtErrorHandler errorHandler) {
        return Observable.create(new Observable.OnSubscribeFunc<T>() {
            @Override
            public Subscription onSubscribe(final Observer<? super T> subscriber) {
                return observable.subscribe(new Observer<T>() {
                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e) {
                        Observable observable = errorHandler.onError(e);
                        observable.subscribe(subscriber);
                    }

                    @Override
                    public void onNext(T args) {
                        subscriber.onNext(args);
                    }
                });
            }
        });
    }
}

And used thusly:

private Observable refreshAuthIfNecessary(final Observable<?> request) {
    return CatchObservable.catchObserver(request, new CatchObservable.CaughtErrorHandler() {
        @Override
        public Observable<?> onError(Throwable throwable) {
            return Observable.concat(WebServices.this.refreshAuth(), request);
        }
    });
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top