Вопрос

I have a situation where I have 2 blocking queues. The first I insert some tasks that I execute. When each task completes, it adds a task to the second queue, where they are executed.

So my first queue is easy: I just check to make sure it's not empty and execute, else I interrupt():

public void run() {
    try {
        if (taskQueue1.isEmpty()) {
            SomeTask task = taskQueue1.poll();
            doTask(task);
            taskQueue2.add(task);
        }
        else {
            Thread.currentThread().interrupt();
        }
    }

    catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

The second one I do the following, which as you can tell, doesn't work:

public void run() {
    try {
        SomeTask2 task2 = taskQueue2.take();
        doTask(task2);
    }

    catch (InterruptedException ex) {

    }
    Thread.currentThread().interrupt();

}

How would you solve it so that the second BlockingQueue doesn't block on take(), yet finishes only when it knows there are no more items to be added. It would be good if the 2nd thread could see the 1st blocking queue perhaps, and check if that was empty and the 2nd queue was also empty, then it would interrupt.

I could also use a Poison object, but would prefer something else.

NB: This isn't the exact code, just something I wrote here:

Это было полезно?

Решение

You make it sound as though the thread processing the first queue knows that there are no more tasks coming as soon as its queue is drained. That sounds suspicious, but I'll take you at your word and propose a solution anyway.

Define an AtomicInteger visible to both threads. Initialize it to positive one.

Define the first thread's operation as follows:

  • Loop on Queue#poll().
  • If Queue#poll() returns null, call AtomicInteger#decrementAndGet() on the shared integer.
    • If AtomicInteger#decrementAndGet() returned zero, interrupt the second thread via Thread#interrupt(). (This handles the case where no items ever arrived.)
    • In either case, exit the loop.
  • Otherwise, process the extracted item, call AtomicInteger#incrementAndGet() on the shared integer, add the extracted item to the second thread's queue, and continue the loop.

Define the second thread's operation as follows:

  • Loop blocking on BlockingQueue#take().
  • If BlockingQueue#take() throws InterruptedException, catch the exception, call Thread.currentThread().interrupt(), and exit the loop.
  • Otherwise, process the extracted item.
  • Call AtomicInteger#decrementAndGet() on the shared integer.
    • If AtomicInteger#decrementAndGet() returned zero, exit the loop.
    • Otherwise, continue the loop.

Make sure you understand the idea before trying to write the actual code. The contract is that the second thread continues waiting on more items from its queue until the count of expected tasks reaches zero. At that point, the producing thread (the first one) will no longer push any new items into the second thread's queue, so the second thread knows that it's safe to stop servicing its queue.

The screwy case arises when no tasks ever arrive at the first thread's queue. Since the second thread only decrements and tests the count after it processes an item, if it never gets a chance to process any items, it won't ever consider stopping. We use thread interruption to handle that case, at the cost of another conditional branch in the first thread's loop termination steps. Fortunately, that branch will execute only once.

There are many designs that could work here. I merely described one that introduced only one additional entity—the shared atomic integer—but even then, it's fiddly. I think that using a poison pill would be much cleaner, though I do concede that neither Queue#add() nor BlockingQueue#put() accept null as a valid element (due to Queue#poll()'s return value contract). It would be otherwise be easy to use null as a poison pill.

Другие советы

I can't figure out what you are actually trying to do here, but I can say that the interrupt() in your first run() method is either pointless or wrong.

  • If you are running the run() method in your own Thread object, then that thread is about to exit anyway, so there's no point interrupting it.

  • If you are running the run() method in an executor with a thread pool, then you most likely don't want to kill the thread or shut down the executor at all ... at that point. And if you do want to shutdown the executor, then you should call one of its shutdown methods.


For instance, here's a version what does what you seeming to be doing without all of the interrupt stuff, and without thread creation/destruction churn.

public class TaskExecutor {

    private ExecutorService executor = new ThreadPoolExecutorService(...);

    public void submitTask1(final SomeTask task) {
        executor.submit(new Runnable(){
            public void run() {
                doTask(task);
                submitTask2(task);
            }
        });
    }

    public void submitTask2(final SomeTask task) {
        executor.submit(new Runnable(){
            public void run() {
                doTask2(task);
            }
        });
    }

    public void shutdown() {
        executor.shutdown();
    }
 }

If you want separate queuing for the tasks, simply create and use two different executors.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top