Pregunta

Tengo una clase que toma objetos de un BlockingQueue y los procesa llamando a take () en un ciclo continuo. En algún momento sé que no se agregarán más objetos a la cola. ¿Cómo interrumpo el método take () para que deje de bloquear?

Aquí está la clase que procesa los objetos:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

Y aquí está el método que utiliza esta clase para procesar objetos:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
¿Fue útil?

Solución

Si interrumpir el hilo no es una opción, otra es colocar un " marcador " o " comando " objeto en la cola que MyObjHandler reconocería como tal y se rompería el bucle.

Otros consejos

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Sin embargo, si lo hace, el hilo podría interrumpirse mientras todavía hay elementos en la cola, esperando ser procesados. Es posible que desee considerar el uso de poll en lugar de take , lo que permitirá que el subproceso de proceso se agote y finalice cuando haya esperado un tiempo. No hay entrada nueva.

Muy tarde, pero espero que esto también ayude a otra persona Me enfrenté al problema similar y utilicé el enfoque poll sugerido por erickson anterior con algunos cambios menores,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

Esto solucionó ambos problemas

  1. Marcó la BlockingQueue para que sepa que no tiene que esperar más por los elementos
  2. No se interrumpió entre ellos, por lo que el procesamiento de los bloques termina solo cuando se procesan todos los elementos en la cola y no quedan elementos por agregar

Interrumpa el hilo:

thread.interrupt()

O no interrumpir, es desagradable.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. cuando el productor ponga el objeto en la cola, llame a queue.notify () , si termina, llame a queue.done()
  2. bucle while (! queue.isDone () ||! queue.isEmpty ())
  3. prueba take () devuelve valor para nulo
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top