Question

I'm looking a way to define order(?) of observers.

@GET("/get_user_msgs")
Observable<PrivateMessagesResponse> getPrivateMessages(@QueryMap Map<String, String> params);

For example I gave a Observable from my Rest API created by Retrofit.

In my ListView I'm observing this Observable.

api.getPrivateMessages(params).subscribe(new Observer());

I also have an API wrapper for my Espresso tests and I'm subscribing to same Observable there. This way observer in API wrapper is called first and only then observer in ListView is called.

public class IdlingWrapper implements Api, IdlingResource { 
   ....

    public IdlingWrapper(Api realApi) {
        this.realApi = realApi;
    }

    ...

    public Observable<PrivateMessagesResponse> getPrivateMessages(@QueryMap Map<String, String> params); {
        counter.incrementAndGet();
        return wrapObservable(realApi.getPrivateMessages(params));
    }

    protected <T> Observable<T> wrapObservable(final Observable<PrivateMessagesResponse> observable) {
        //what to do here?
    }
}

Is there a way to force some observer to be notified after all others are done? Or something similar in that matter?

Something like

Observable observable = getObservable();
observable.subscribeAsLast(new LastObserver());
observable.subscribe(new ObserverA());
observable.subscribe(new ObserverB());

And so that ObserverA would be notified first, then ObserverB and only then LastObserver.

Or any other approach where I could find out when all registered observers were notified and completed.

Was it helpful?

Solution

I'm not exactly sure what you are trying to do in IdlingWrapper, but I think the current implementation is very fragile.

I think the most important thing that needs to happen is to guarantee the observable can only be called once.

Here is a quick implementation to demonstrate that as well as my implementation of wrapObservable.

public class Test {

    private static int counter = 0;

    private static final List<Observable<?>> list = Collections.synchronizedList(new ArrayList<>());

    protected static <T> Observable<T> wrapObservable(final Observable<T> original) {
        // run atleast once???
        synchronized (list) {
            list.add(original);
        }

        return Observable.create(new Observable.OnSubscribe<Void>() {
            @Override
            public void call(Subscriber<? super Void> subscriber) {
                synchronized (list) {
                    counter++;
                    if (!list.contains(original)) {
                        subscriber.onError(new Exception("You can only subscribe once!"));
                        return;
                    }
                    list.remove(original);
                }

                // Sleep to make it easier to see things happening...
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException ignored) {
                }

                subscriber.onCompleted();
            }
        }).flatMap(new Func1<Void, Observable<? extends T>>() {
            @Override
            public Observable<? extends T> call(Void o) {
                return original;
            }
        }).finallyDo(new Action0() {
            @Override
            public void call() {
                synchronized (list) {
                    counter--;
                    if (list.size() == 0 && counter == 0) {
                        System.err.println("finally");
                    }
                }
            }
        });
    }

    public static void main(String[] args) throws InterruptedException {
        for(int i = 0; i < 10; i++) {
            // running in io thread for simulating async call.
            Observable<String> test = wrapObservable(Observable.from("TEST!!!!!!")).subscribeOn(Schedulers.io());
            test.subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                    System.err.println("completed");
                }

                @Override
                public void onError(Throwable e) {
                    System.err.println("error");
                }

                @Override
                public void onNext(String s) {
                    System.err.println("next");
                }
            });

            // example of calling the same observable twice.
            test.subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                    System.err.println("completed");
                }

                @Override
                public void onError(Throwable e) {
                    System.err.println("error");
                }

                @Override
                public void onNext(String s) {
                    System.err.println("next");
                }
            });
        }

        Thread.sleep(10000);
    }
}

OTHER TIPS

It seems, that this worked just fine.

protected <T> Observable<T> wrapObservable(final Observable<T> original) {
    return Observable.create(new Observable.OnSubscribeFunc<T>() {
        @Override
        public Subscription onSubscribe(final Observer<? super T> t1) {
            original.subscribe(new Observer<T>() {
                @Override
                public void onCompleted() {
                    t1.onCompleted();
                    uiThreadHandler.post(new Runnable() {
                        @Override
                        public void run() {
                            counter.decrementAndGet();
                            notifyIdle();
                        }
                    });
                }

                @Override
                public void onError(Throwable e) {
                    t1.onError(e);
                    uiThreadHandler.post(new Runnable() {
                        @Override
                        public void run() {
                            counter.decrementAndGet();
                            notifyIdle();
                        }
                    });
                }

                @Override
                public void onNext(T args) {
                    t1.onNext(args);
                }
            });

            return Subscriptions.empty();
        }
    });
}

If you want to just use built in RxJava methods to order your observers, you can use flatMap and range to turn each item into multiple items each with a priority and then filter on priority. Observers are ordered based on how they filter.

Here's a trivial example:

Observable<Pair<Integer, Object>> shared = RxView.clicks(findViewById(R.id.textView))
        .flatMap(c -> Observable.range(0, 2).map(i -> Pair.create(i, c)))
        .share();

shared.filter(p -> p.first == 1)
        .map(p -> p.second)
        .doOnSubscribe(c -> Log.d(TAG, "first subscribed doOnSubscribe"))
        .subscribe(c -> Log.d(TAG, "first subscribed onNext"));

shared.filter(p -> p.first == 0)
        .map(p -> p.second)
        .doOnSubscribe(c -> Log.d(TAG, "second subscribed doOnSubscribe"))
        .subscribe(c -> Log.d(TAG, "second subscribed onNext"));

If you are doing this all over the place

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top