Вопрос

Let's say we have a Queue

BlockingQueue<String> queue= new LinkedBlockingQueue<>();

and some other thread puts values in it, then we read it like

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}

How can I iterate over this queue in stream style, while maintaining similar semantics to above code.

This code only traverses the current queue state:

queue.stream().forEach(e -> System.out.println(e));
Это было полезно?

Решение

I'm guessing a bit at what you're expecting, but I think I have a good hunch.

The stream of a queue, like iterating over a queue, represents the current contents of the queue. When the iterator or the stream reaches the tail of the queue, it doesn't block awaiting further elements to be added. The iterator or the stream is exhausted at that point and the computation terminates.

If you want a stream that consists of all current and future elements of the queue, you can do something like this:

Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException ie) {
            return "Interrupted!";
        }
    })
    .filter(s -> s.endsWith("x"))
    .forEach(System.out::println);   

(Unfortunately the need to handle InterruptedException makes this quite messy.)

Note that there is no way to close a queue, and there is no way for Stream.generate to stop generating elements, so this is effectively an infinite stream. The only way to terminate it is with a short-circuiting stream operation such as findFirst.

Другие советы

Another approach is to build a custom Spliterator. In my case, I've got a blocking queue, and I want to build a stream that continues to extract elements, until the block times out. The spliterator is something like:

public class QueueSpliterator<T> implements Spliterator<T> {
    private final BlockingQueue<T> queue;
    private final long timeoutMs;

    public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
        this.queue = queue;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public int characteristics() {
        return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        try {
            final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
            if (next == null) {
                return false;
            }
            action.accept(next);
            return true;
        } catch (final InterruptedException e) {
            throw new SupplierErrorException("interrupted", e);
        }
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

}

The exception thrown to handle InterruptedException is an extension of RuntimeException. Using this class, one can build a stream via: StreamSupport.stream(new QueueSpliterator(...)) and add the usual stream operations.

You could look at an async Queue implementation. If you have Java 8, then cyclops-react, I a developer on this project, provides an async.Queue that will allow you to both populate and consume from the Queue asyncrhonously (and cleanly).

e.g.

Queue<String> queue = QueueFactories.<String>unboundedQueue().build();

Or simply (as long as this is a com.aol.simple.react.async.Queue)

Queue<String> queue = new Queue<>();

Then in a separate thread :

new Thread(() -> {
        while (true) {
            queue.add("New message " + System.currentTimeMillis());
        }
    }).start();

Back on your main thread, your original code should now work as expected (infinetely iterate over the messages being added to the queue and print them out)

queue.stream().forEach(e -> System.out.println(e));

The Queue and hence the Stream can be closed at any stage via -

queue.close();
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top