Frage

Say I have a bunch of transformations on an Observable:

operation()
    .flatMap(toSomething())
    .map(toSomethingElse())
    .flatMap(toYetSomethingElse())
    .subscribeOn(Schedulers.newThread())
    .observeOn(AdroidSchedulers.mainThread())
    .subscribe(observer);

Are all of these operations synchronous except for the last call to flatMap()? Or are all of the operations run on the thread that I told it to subscribe on?

War es hilfreich?

Lösung

I figured this out, with a test. The following test passes (which means the emissions on the Observable are all on the same background thread):

    volatile long observableThreadId;

    @Test
    public void transformedObservables_shouldRunInSameThread() {

        Observable.from(new String[]{"a", "b", "c"}) //
            .flatMap(new Func1<String, Observable<Object>>() {
                @Override public Observable<Object> call(String s) {
                    observableThreadId = Thread.currentThread().getId();
                    return Observable.from((Object) s);
                }
            }) //
            .map(new Func1<Object, String>() {
                @Override public String call(Object o) {
                    long id = Thread.currentThread().getId();
                    if (id != observableThreadId) {
                        throw new RuntimeException("Thread ID mismatch");
                    }

                    return (String) o;
                }
            }) //
            .flatMap(new Func1<String, Observable<String>>() {
                @Override public Observable<String> call(String s) {
                    long id = Thread.currentThread().getId();
                    if (id != observableThreadId) {
                        throw new RuntimeException("Thread ID mismatch");
                    }

                    return Observable.from(s);
                }
            }) //
            .subscribeOn(Schedulers.newThread()) //
            .observeOn(Schedulers.currentThread()) //
            .subscribe(new Observer<String>() {
                @Override public void onCompleted() {
                    assertThat(Thread.currentThread().getId()).isNotEqualTo(observableThreadId);
                }

                @Override public void onError(Throwable throwable) {

                }

                @Override public void onNext(String s) {

                }
            });

        System.out.println("blah");
    }

=============================== UPDATE:

A better answer can actually be found in the ReactiveX documentation on Scheduler:

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

... the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top