Question

I'm looking for a way to block until a BlockingQueue is empty.

I know that, in a multithreaded environment, as long as there are producers putting items into the BlockingQueue, there can be situations in which the queue becomes empty and a few nanoseconds later it is full of items.

But, if there's only one producer, then it may want to wait (and block) until the queue is empty after it has stopped putting items into the queue.

Java/Pseudocode:

// Producer code
BlockingQueue queue = new BlockingQueue();

while (having some tasks to do) {
    queue.put(task);
}

queue.waitUntilEmpty(); // <-- how to do this?

print("Done");

Do you have any idea?

EDIT: I know that wrapping BlockingQueue and using an extra condition would do the trick, I'm just asking if there are some pre-made solutions and/or better alternatives.

Was it helpful?

Solution

A simple solution using wait() and notify():

// Producer:
synchronized(queue) {
    while (!queue.isEmpty())
        queue.wait(); //wait for the queue to become empty
    queue.put();
}

//Consumer:
synchronized(queue) {
    queue.get();
    if (queue.isEmpty())
        queue.notify(); // notify the producer
}

OTHER TIPS

I understand you could already have bunch of threads actively polling or taking the queue but I still feel not quite right about your flow/design.

The queue becomes empty doesn't mean the previously added tasks are finished, somes of the items could take ages to process, so it is not too useful to check for empty.

So what you should do is forget about the BlockingQueue, you can use it as any other collections. Translate the items into a Collections of Callable and make use of the ExecutorService.invokeAll().

    Collection<Item> queue = ...
    Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>();

    for (Item item : queue) {
        tasks.add(new Callable<Result>() {

            @Override
            public Result call() throws Exception {
                // process the item ...

                return result;
            }
        });
    }

    // look at the results, add timeout for invokeAll if necessary
    List<Future<Result>> results = executorService.invokeAll(tasks);

    // done

This approach will give you full control of how long your producer could wait and proper exception handling.

It's not quite what you want to do, but using a SynchronousQueue would have a very similar effect as your Java/Pseudocode, namely the producer blocking until all of the data has been retrieved by some consumer.

Only difference being the producer blocking on each put until a consumer comes to retrieve the data, instead of only once at the end. Not sure if that would make a difference in your case. I'd expect it to only make a noticeable difference, if the task performed by the producer is somewhat expensive.

Your use case should be quite special because most typically you would only want to block the producer when the queue is full, but not wait until it is empty.

Anyway, this is doable. I believe that spinning until isEmpty returns true is not THAT inefficient because the producer will be locally spinning, i.e., will be accessing its own cache, not banging the bus. It will however be consume CPU time as the thread remains schedulable. But local spinning is definitely the easier way. Otherwise I see two options:

  1. Using wait + notify like @niculare suggested
  2. Somehow make the first consumer that notices the queue empty to notify the producer in a lock-free way; this will be slower but degrade "more" gracefully

The answer @niculare gave seems OK but check out the comment of @hankduan.

I saw this question when searching for the solution of a similar problem.
At the end, I more or less rewrote LinkedBlockingQueue.
It has a subset of its methods and does not implement Collection or Iterable.
It manages multiple producers and consumers.
For me it works.

/**********************************************************************************************************************
 * Import specifications
 *********************************************************************************************************************/
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

/**********************************************************************************************************************
 * This class implements a completely reentrant FIFO.
 *********************************************************************************************************************/
public class BlockingFIFO<E>
{
  /********************************************************************************************************************
   * The constructor creates an empty FIFO with a capacity of {@link Integer#MAX_VALUE}.
   *******************************************************************************************************************/
  public BlockingFIFO()
  {
    // -----------------
    // Initialize object
    // -----------------
    this(Integer.MAX_VALUE);

  } // constructor

  /********************************************************************************************************************
   * The constructor creates an empty FIFO with the specified capacity.
   *
   * @param capacity_ipar The maximum number of elements the FIFO may contain.
   *******************************************************************************************************************/
  public BlockingFIFO(int capacity_ipar)
  {
    // ---------------------
    // Initialize attributes
    // ---------------------
    lock_attr = new ReentrantLock();
    not_empty_attr = lock_attr.newCondition();
    not_full_attr = lock_attr.newCondition();
    head_attr = null;
    tail_attr = null;
    capacity_attr = capacity_ipar;
    size_attr = 0;

  } // constructor

  /********************************************************************************************************************
   * This method removes all of the elements from the FIFO.
   *
   * @return The number of elements in the FIFO before it was cleared.
   *******************************************************************************************************************/
  public int clear()
  {
    // -----------------
    // Initialize result
    // -----------------
    int result;
    result = 0;

    // ----------
    // Clear FIFO
    // ----------
    lock_attr.lock();
    try
    {
      result = size_attr;
      head_attr = null;
      tail_attr = null;
      size_attr = 0;
      not_full_attr.signalAll();
    }
    finally
    {
      lock_attr.unlock();
    }

    // ----
    // Done
    // ----
    return (result);

  } // clear

  /********************************************************************************************************************
   * This method returns the number of elements in the FIFO.
   *
   * @return The number of elements in the FIFO.
   *******************************************************************************************************************/
  public int size()
  {
    // -----------
    // Return size
    // -----------
    lock_attr.lock();
    try
    {
      return (size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // size

  /********************************************************************************************************************
   * This method returns the number of additional elements that the FIFO can ideally accept without blocking.
   *
   * @return The remaining capacity the FIFO.
   *******************************************************************************************************************/
  public int remainingCapacity()
  {
    // -------------------------
    // Return remaining capacity
    // -------------------------
    lock_attr.lock();
    try
    {
      return (capacity_attr - size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // remainingCapacity

  /********************************************************************************************************************
   * This method waits for the FIFO to become empty.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public void waitEmpty()
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
        not_full_attr.await();
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for the FIFO to become empty.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum time to wait for the FIFO to become empty.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long    timeout_ipar,
                           TimeUnit unit_ipar)
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    return (waitEmpty(unit_ipar.toMillis(timeout_ipar)));

  } // waitEmpty

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO if it is possible to do so immediately without
   * exceeding the queue's capacity.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar The element to add to the FIFO.
   * @return              True if and only if the element was added to the FIFO.
   *******************************************************************************************************************/
  public boolean offer(E element_ipar)
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      if (capacity_attr > size_attr)
      {
        push(element_ipar);
        return (true);
      }
      else
        return (false);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum number of milliseconds to wait for space to become available.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E    element_ipar,
                       long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      push(element_ipar);
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum time to wait for space to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E        element_ipar,
                       long     timeout_ipar,
                       TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ----------------------------
    // Try to add specified element
    // ----------------------------
    return (offer(element_ipar, unit_ipar.toMillis(timeout_ipar)));

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary for space to become available.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public void put(E element_ipar)
    throws InterruptedException
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
        not_full_attr.await();
      push(element_ipar);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // put

  /********************************************************************************************************************
   * This method retrieves, but does not remove, the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E peek()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (head_attr.contents);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // peek

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E poll()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for an element to become available.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
      {
        if (!not_empty_attr.awaitUntil(deadline))
          return (null);
      }
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum time to wait for an element to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long     timeout_ipar,
                TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ------------------------
    // Try to get first element
    // ------------------------
    return (poll(unit_ipar.toMillis(timeout_ipar)));

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting if necessary for an element to become available.
   *
   * @return                      The head of the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public E take()
    throws InterruptedException
  {
    // ---------------------------
    // Try to return first element
    // ---------------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
        not_empty_attr.await();
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // take

  /********************************************************************************************************************
   * This class implements as node within the FIFO.
   *******************************************************************************************************************/
  private class Node
  {
    E    contents;
    Node next;

  } // class Node

  /********************************************************************************************************************
   * This method adds the specified element to the end of the FIFO.
   * <br>It sends a signal to all threads waiting for the FIFO to contain something.
   * <br>The caller should have locked the object and have made sure the list is not full.
   *******************************************************************************************************************/
  private void push(E element_ipar)
  {
    // -----------
    // Create node
    // -----------
    Node node;
    node = new Node();
    node.contents = element_ipar;
    node.next = null;

    // --------------
    // Add to the end
    // --------------
    if (head_attr == null)
      head_attr = node;
    else
      tail_attr.next = node;
    tail_attr = node;

    // ----------------------
    // We got another element
    // ----------------------
    size_attr++;
    not_empty_attr.signalAll();

  } // push

  /********************************************************************************************************************
   * This method removes the first element from the FIFO and returns it.
   * <br>It sends a signal to all threads waiting for the FIFO to have space available.
   * <br>The caller should have locked the object and have made sure the list is not empty.
   *******************************************************************************************************************/
  private E pop()
  {
    // ------------
    // Isolate node
    // ------------
    Node node;
    node = head_attr;
    head_attr = node.next;
    if (head_attr == null)
      tail_attr = null;

    // --------------------------
    // We removed another element
    // --------------------------
    size_attr--;
    not_full_attr.signalAll();

    // ----
    // Done
    // ----
    return (node.contents);

  } // pop

  /********************************************************************************************************************
   * This attribute represents the lock on the FIFO.
   *******************************************************************************************************************/
  private Lock lock_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being empty.
   *******************************************************************************************************************/
  private Condition not_empty_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being full.
   *******************************************************************************************************************/
  private Condition not_full_attr;

  /********************************************************************************************************************
   * This attribute represents the first element of the FIFO.
   *******************************************************************************************************************/
  private Node head_attr;

  /********************************************************************************************************************
   * This attribute represents the last element of the FIFO.
   *******************************************************************************************************************/
  private Node tail_attr;

  /********************************************************************************************************************
   * This attribute represents the capacity of the FIFO.
   *******************************************************************************************************************/
  private int capacity_attr;

  /********************************************************************************************************************
   * This attribute represents the size of the FIFO.
   *******************************************************************************************************************/
  private int size_attr;

} // class BlockingFIFO

According to the documentation, BlockingQueue can be blocked on "Remove" by methods: take() or poll(time, unit) instead of poll()

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top