Come interrompere un BlockingQueue che sta bloccando su take ()?
-
03-07-2019 - |
Domanda
Ho una classe che prende oggetti da un BlockingQueue
e li elabora chiamando take ()
in un ciclo continuo. Ad un certo punto so che non verranno aggiunti altri oggetti alla coda. Come posso interrompere il metodo take ()
in modo che smetta di bloccare?
Ecco la classe che elabora gli oggetti:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Ed ecco il metodo che utilizza questa classe per elaborare gli oggetti:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
Soluzione
Se l'interruzione del thread non è un'opzione, un'altra è quella di posizionare un " marker " o " comando " oggetto sulla coda che sarebbe riconosciuto come tale da MyObjHandler e uscirà dal ciclo.
Altri suggerimenti
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
thread.interrupt();
Tuttavia, se lo fai, il thread potrebbe essere interrotto mentre ci sono ancora elementi nella coda, in attesa di essere elaborati. Puoi prendere in considerazione l'utilizzo di poll
invece di take
, che permetterà al thread di elaborazione di scadere e terminare quando ha atteso un po 'di tempo con nessun nuovo input.
Molto tardi, ma spero che questo aiuti anche altri come ho affrontato il problema simile e ho usato l'approccio poll
suggerito da erickson sopra con alcune piccole modifiche,
class MyObjHandler implements Runnable
{
private final BlockingQueue<MyObj> queue;
public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
public MyObjHandler(BlockingQueue queue)
{
this.queue = queue;
Finished = false;
}
@Override
public void run()
{
while (true)
{
try
{
MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
{
// process obj here
// ...
}
if(Finished && queue.isEmpty())
return;
}
catch (InterruptedException e)
{
return;
}
}
}
}
public void testHandler()
{
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjHandler handler = new MyObjHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
{
queue.put(i.next());
}
// what code should go here to tell the handler to stop waiting for more objects?
handler.Finished = true; //THIS TELLS HIM
//If you need you can wait for the termination otherwise remove join
myThread.join();
}
Ciò ha risolto entrambi i problemi
- Contrassegnato il
BlockingQueue
in modo che sappia che non deve attendere più elementi - Non si è interrotto in mezzo in modo che i blocchi di elaborazione terminino solo quando vengono elaborati tutti gli elementi in coda e non ci sono elementi rimanenti da aggiungere
Interrompi il thread:
thread.interrupt()
O non interrompere, è brutto.
public class MyQueue<T> extends ArrayBlockingQueue<T> {
private static final long serialVersionUID = 1L;
private boolean done = false;
public ParserQueue(int capacity) { super(capacity); }
public void done() { done = true; }
public boolean isDone() { return done; }
/**
* May return null if producer ends the production after consumer
* has entered the element-await state.
*/
public T take() throws InterruptedException {
T el;
while ((el = super.poll()) == null && !done) {
synchronized (this) {
wait();
}
}
return el;
}
}
- quando il produttore inserisce l'oggetto nella coda, chiama
queue.notify ()
, se finisce, chiamaqueue.done()
- loop while (! queue.isDone () ||! queue.isEmpty ())
- test take () restituisce valore per null