¿Cómo interrumpir un BlockingQueue que está bloqueando en take ()?
-
03-07-2019 - |
Pregunta
Tengo una clase que toma objetos de un BlockingQueue
y los procesa llamando a take ()
en un ciclo continuo. En algún momento sé que no se agregarán más objetos a la cola. ¿Cómo interrumpo el método take ()
para que deje de bloquear?
Aquí está la clase que procesa los objetos:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Y aquí está el método que utiliza esta clase para procesar objetos:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
Solución
Si interrumpir el hilo no es una opción, otra es colocar un " marcador " o " comando " objeto en la cola que MyObjHandler reconocería como tal y se rompería el bucle.
Otros consejos
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
thread.interrupt();
Sin embargo, si lo hace, el hilo podría interrumpirse mientras todavía hay elementos en la cola, esperando ser procesados. Es posible que desee considerar el uso de poll
en lugar de take
, lo que permitirá que el subproceso de proceso se agote y finalice cuando haya esperado un tiempo. No hay entrada nueva.
Muy tarde, pero espero que esto también ayude a otra persona Me enfrenté al problema similar y utilicé el enfoque poll
sugerido por erickson anterior con algunos cambios menores,
class MyObjHandler implements Runnable
{
private final BlockingQueue<MyObj> queue;
public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
public MyObjHandler(BlockingQueue queue)
{
this.queue = queue;
Finished = false;
}
@Override
public void run()
{
while (true)
{
try
{
MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
{
// process obj here
// ...
}
if(Finished && queue.isEmpty())
return;
}
catch (InterruptedException e)
{
return;
}
}
}
}
public void testHandler()
{
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjHandler handler = new MyObjHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
{
queue.put(i.next());
}
// what code should go here to tell the handler to stop waiting for more objects?
handler.Finished = true; //THIS TELLS HIM
//If you need you can wait for the termination otherwise remove join
myThread.join();
}
Esto solucionó ambos problemas
- Marcó la
BlockingQueue
para que sepa que no tiene que esperar más por los elementos - No se interrumpió entre ellos, por lo que el procesamiento de los bloques termina solo cuando se procesan todos los elementos en la cola y no quedan elementos por agregar
Interrumpa el hilo:
thread.interrupt()
O no interrumpir, es desagradable.
public class MyQueue<T> extends ArrayBlockingQueue<T> {
private static final long serialVersionUID = 1L;
private boolean done = false;
public ParserQueue(int capacity) { super(capacity); }
public void done() { done = true; }
public boolean isDone() { return done; }
/**
* May return null if producer ends the production after consumer
* has entered the element-await state.
*/
public T take() throws InterruptedException {
T el;
while ((el = super.poll()) == null && !done) {
synchronized (this) {
wait();
}
}
return el;
}
}
- cuando el productor ponga el objeto en la cola, llame a
queue.notify ()
, si termina, llame aqueue.done()
- bucle while (! queue.isDone () ||! queue.isEmpty ())
- prueba take () devuelve valor para nulo