Вопрос

У меня есть класс, который берет объекты из BlockingQueue и обрабатывает их, вызывая take () в непрерывном цикле. В какой-то момент я знаю, что больше объектов не будет добавлено в очередь. Как мне прервать метод take () , чтобы он прекратил блокировку?

Вот класс, который обрабатывает объекты:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

А вот метод, который использует этот класс для обработки объектов:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
Это было полезно?

Решение

Если прерывание потока не является вариантом, другим является размещение маркера " или " команда " объект в очереди, который будет распознан как таковой MyObjHandler и выведен из цикла.

Другие советы

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Однако, если вы это сделаете, поток может быть прерван, пока в очереди все еще есть элементы, ожидающие обработки. Возможно, вы захотите использовать poll вместо take , что позволит потоку обработки истечь и завершиться, когда он некоторое время ждал с нет нового ввода.

Очень поздно, но надеюсь, это поможет и другим, так как я столкнулся с подобной проблемой и использовал подход poll , предложенный erickson выше с некоторыми небольшими изменениями,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

Это решило обе проблемы

<Ол>
  • Отметил BlockingQueue , чтобы он знал, что ему не нужно больше ждать элементов
  • Не прервано между ними, поэтому блоки обработки завершаются только тогда, когда обрабатываются все элементы в очереди и нет элементов, которые нужно добавить
  • Прервать поток:

    thread.interrupt()
    

    Или не перебивай, это противно.

        public class MyQueue<T> extends ArrayBlockingQueue<T> {
    
            private static final long serialVersionUID = 1L;
            private boolean done = false;
    
            public ParserQueue(int capacity) {  super(capacity); }
    
            public void done() { done = true; }
    
            public boolean isDone() { return done; }
    
            /**
             * May return null if producer ends the production after consumer 
             * has entered the element-await state.
             */
            public T take() throws InterruptedException {
                T el;
                while ((el = super.poll()) == null && !done) {
                    synchronized (this) {
                        wait();
                    }
                }
    
                return el;
            }
        }
    
    <Ол>
  • когда производитель помещает объект в очередь, вызовите queue.notify () , если он заканчивается, вызовите queue.done ()
  • цикл while (! queue.isDone () ||! queue.isEmpty ())
  • test take () возвращает значение для null
  • Лицензировано под: CC-BY-SA с атрибуция
    Не связан с StackOverflow
    scroll top