Como interromper um BlockingQueue que está bloqueando take ()?
-
03-07-2019 - |
Pergunta
Eu tenho uma classe que leva objetos de um BlockingQueue
e processa-los chamando take()
em um loop contínuo. Em algum momento eu sei que há mais objetos serão adicionados à fila. Como faço para interromper o método take()
para que ele pára bloqueando?
Aqui é a classe que processa os objetos:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
E aqui está o método que usa essa classe para objetos de processo:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
Solução
Se interromper o fio não é uma opção, outra é colocar um "marcador" ou objeto "comando" na fila que seria reconhecido como tal por MyObjHandler e sair do loop.
Outras dicas
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
thread.interrupt();
No entanto, se você fizer isso, o fio pode ser interrompida enquanto ainda há itens na fila, esperando para ser processado. Você pode querer considerar o uso de poll
vez de take
, o que permitirá que o segmento de processamento de tempo limite e terminar quando ele esperou por um tempo sem entrada nova.
Muito tarde, mas Espero que isso ajude outros também como eu enfrentei o problema semelhante e usou a abordagem poll
sugerido por Erickson acima com algumas pequenas alterações,
class MyObjHandler implements Runnable
{
private final BlockingQueue<MyObj> queue;
public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
public MyObjHandler(BlockingQueue queue)
{
this.queue = queue;
Finished = false;
}
@Override
public void run()
{
while (true)
{
try
{
MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
{
// process obj here
// ...
}
if(Finished && queue.isEmpty())
return;
}
catch (InterruptedException e)
{
return;
}
}
}
}
public void testHandler()
{
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjHandler handler = new MyObjHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
{
queue.put(i.next());
}
// what code should go here to tell the handler to stop waiting for more objects?
handler.Finished = true; //THIS TELLS HIM
//If you need you can wait for the termination otherwise remove join
myThread.join();
}
Isso resolveu ambos os problemas
- Marcado o
BlockingQueue
para que ele sabe que não tem que esperar mais por elementos - Não interrompida no meio de modo que o processamento de blocos termina somente quando todos os itens na fila são processados ??e não há itens restantes para ser adicionado
Interromper o fio:
thread.interrupt()
Ou não interrompa, sua desagradável.
public class MyQueue<T> extends ArrayBlockingQueue<T> {
private static final long serialVersionUID = 1L;
private boolean done = false;
public ParserQueue(int capacity) { super(capacity); }
public void done() { done = true; }
public boolean isDone() { return done; }
/**
* May return null if producer ends the production after consumer
* has entered the element-await state.
*/
public T take() throws InterruptedException {
T el;
while ((el = super.poll()) == null && !done) {
synchronized (this) {
wait();
}
}
return el;
}
}
- quando coloca produtores opor ao
queue.notify()
fila, chamada, se ele termina, chamadaqueue.done()
- loop while (! Queue.isDone () ||! Queue.isEmpty ())
- teste take () valor de retorno para nula