Frage

So I'm playing around with RX (really cool), and I've been converting my api that accesses a sqlite database in Android to return observables.

So naturally one of the problems I started to try to solve is, "What if I want to make 3 API calls, get the results, and then do some processing once they are all finished?"

It took me an hour or 2, but I eventually found the Zip Functionality and it helps me out handily:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

Great! So that's cool.

So when I zip up the 3 observables they run in serial. What if I want them all to run in parallel at the same time so I end up getting the results faster? I've played around with a few things, and even tried reading some of the original RX stuff people have written in C#. I'm sure there is a simple answer. Can anyone point me in the right direction? What is the proper way to do this?

War es hilfreich?

Lösung

zip does run the observables in parallel - but it also subscribes to them serially. Since your getNumberedObservable is completing in the subscription method it gives the impression of running serially, but there is in fact no such limitation.

You can either try with some long running Observables that outlive their subscription logic, such as timer, or use the subscribeOn method to subscribe asynchronously to each stream passed to zip.

Andere Tipps

In RxJava, use toAsync to turn a regular function into something that will run on a thread and return its result in an observable.

I don't know Java syntax that well, but it would look something like:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};

That would work if getNumber were really accessing a database. When you call getNumberedObservable it returns an observable that will run getNumber on a separate thread when you subscribe to it.

I was trying to do the same, running multiple threads in parallel using the zip. I ended opening a new so question and got an answer. Basically, you have to subscribe each observable to a new thread, so if you want to run three observables in parallel using the zip, you have to have subscribe to 3 separate threads.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top