Question

What I expect for the queue is following:

Let say, the queue contains A-B-C-D-E. And I have 3 consumers. What I need is ALL consumers consume the item A before dequeue A and move on to consume B.

That's to say, all the consumers get ALL the items in the queue. I have the trade-off solution, which is use 3 ArrayBlockingQueue for 3 consumers. But I'd know to know whether there's such queue.

Was it helpful?

Solution 3

You can use disruptor with multiple consumers. Please follow the link. You might have to specify the dependency between consumers. Please use this example code I tried to attach Two handlers with one queue

OTHER TIPS

Requirements you describe is publish/subscribe design pattern. It is implemented by JMS providers such as ActiveMQ.

No, there isn't. Sounds like a pretty easy thing to implement, though. Some options that jump to mind are:

  1. Include a CountDownLatch in the object to consume, and discard the item only when it reaches 0
  2. Have two of the consumers not consume the item (peek the queue rather than pop it)
  3. Don't use consumers, use listeners

Or if you're willing to use something outside of rt.jar, you could use a messaging queue. ActiveMQ and RabbitMQ are popular, they both support publish-subscribe.


EDIT: brief description of listener pattern

Instead of multiplie consumers, have a single ListenerManager consumer. It pulls an object from the queue, then passes it to all Listeners that have previously registered themselves with the ListenerManager. Finally, the ListenerManager disposes of the object. Have a single ListenerManager per queue, and a single queue per event/object type. This makes it easy to manage.

Is there any reason why the simple solution of just putting n references to item A into the queue instead of just one wouldn't work? If you have multiple producers you would have to synchronize their access, but if it's important that a finishes before b you need that anyhow already.

That's actually at least as memory efficient as using an additional class to handle this - the instance of the class adds at least 2 words overhead, while the proposed one only adds 2 references (which can be smaller than 2 words).

What you need to do is to have a single thread listen on the queue, which will then spread the item to the consumers.

// listener thread
Element element;
while ((element = queue.take()) != null) {
  for (final Listener listener : myListeners) {
    listener.getQueue().put(element);
  }
}

Basically a Producer-pattern combined with an Observer pattern.

Use a separate queue for every consumer, and let the producer add it to all of them. There's not really a drawback here.

It's not clear if you need all consumers to have consumed A before any of them moves to B, in that case you have to adjust.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top