Question

An application I am working on makes heavy use of asynchronous processing, and I am looking for a better way to organize the code.

The external input to the system is received on a servlet. The raw data collected by this servlet is deposited in to a queue. A thread pool runs against this queue, and parses the raw data into a structured record which is then deposited in to one of a set of N queues. The queue is chosen such that all records of the same kind go to the same queue. These N queues are serviced by a single thread each, which collects records of the same kind into a set. Every minute a scheduled tasks wakes up and writes into a file all records collected in the previous minute for each kind.

Currently, this code is organized using a bunch of queues, thread pools, and ever-running runnables, which makes the logic hard to follow. I’d like to refactor this code into something where the data-flow described above is more explicit. I am looking for tools and approaches to achieve that.

  • Do tools like RxJava help with this? If so, how?
  • Are there other approaches I should consider?
Was it helpful?

Solution

Here is an example of RxJava according to your description. Hope it would help you.

public class TestServlet extends HttpServlet {

private static final PublishSubject<String> o = PublishSubject.<String>create();

public static Observable<String> getServletObservable() {
    return o.observeOn(Schedulers.computation());
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {
    synchronized (TestServlet.class) {
        o.onNext("value");
    }
}

}

class Foo {
public static void main(String[] args) {
    TestServlet.getServletObservable().map(new Func1<String, String>() {

        @Override
        public String call(String t1) {
            // do something
            return null;
        }

    }).groupBy(new Func1<String, String>() {

        @Override
        public String call(String t1) {
            // do something
            return null;
        }

    }).subscribe(new Observer<GroupedObservable<String, String>>() {

        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(GroupedObservable<String, String> group) {
            group.observeOn(Schedulers.io()).subscribe(new Observer<String>() {

                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String t) {
                    // store t
                }

            });
        }

    });
}
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top