Вопрос

I have a stream of sequential data with two types. And, there are an observer subscribed (listen) for each type of data.

Observer observer1 = new Observer() {
    @Override
    public void next(Data data) {
    //Do something
    }

}


Observer observer2 = new Observer() {
    @Override
    public void next(Data data) {
    //Do something
    }

}

feed.subscribe(observer1, "Type1");
feed.subscribe(observer2, "Type2");

When a piece of data of Type1 is received, the next() method of observer1 will be called and the same thing for Type2 and observer2.

And since, the data is sequential, when we have data of type1 and type2, the second one should wait for the process of the first one (will be blocked).

Now I want to use multi-threading to solve this problem. However, I don't want to create a thread for each piece of data I received. I want the observers run in parallel.

For example, if I have this sequence:

type1, type1, type1, type2, ...

The second and third piece of data should wait for first one processing (because they are type1), but the forth one should be collected by observer2 and processed.

Note that the API that I am using just allow me to subscribe, and it will handle the next() methods.

The solution that comes to my mind is to have a mapping on the threads and observers. So, in each call of next() method, instead of creating a new Thread, I can call the thread assigned to that type to resume. Or, if there is a thread that runs for observer1, I want all the data of type1 stay in a queue waiting for that thread to process them.

Is it any design pattern, or data structure to solve this problem in Java?

Это было полезно?

Решение

This sounds like an ideal fit for the producer/consumer pattern. There would be two consumers, one for each data type. There would also be a producer thread that is the subscriber to the data feed. For each data element received, it places a unit of work on the queue for the appropriate consumer. The above Wikipedia article has a sample Java implementation. Lots of other resources are also available on the web.

Другие советы

This is a typical problem solved with queues. There is a single consumer thread, two queues (N queues in general, where N would be your data types) and 2 (again N in general) worker threads. The consumer thread reads the stream and places the "message" in the correct queue. Each worker thread polls its corresponding queue, processes the next available item and repeats.

Reference: see the implementations of java.util.Queue for Java >= 1.5.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top