Comment interrompre une BlockingQueue qui bloque sur take ()?
-
03-07-2019 - |
Question
J'ai une classe qui prend des objets d'un BlockingQueue
et les traite en appelant take ()
dans une boucle continue. À un moment donné, je sais qu'aucun autre objet ne sera ajouté à la file d'attente. Comment puis-je interrompre la méthode take ()
pour qu’elle cesse de bloquer?
Voici la classe qui traite les objets:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Et voici la méthode qui utilise cette classe pour traiter des objets:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
La solution
Si interrompre le fil n'est pas une option, une autre consiste à placer un "marqueur". ou " commande " objet de la file qui serait reconnu comme tel par MyObjHandler et sortirait de la boucle.
Autres conseils
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
thread.interrupt();
Toutefois, si vous procédez ainsi, le fil peut être interrompu tant qu'il reste des éléments dans la file d'attente en attente de traitement. Vous pouvez envisager d'utiliser poll
au lieu de take
, ce qui permettra à la tâche de traitement de se terminer et de se terminer lorsqu'elle a attendu un certain temps avec pas de nouvelle entrée.
Très tard, mais j'espère que cela aidera les autres aussi , car je rencontrais le même problème et utilisais l'approche poll
proposée par erickson ci-dessus avec quelques modifications mineures,
class MyObjHandler implements Runnable
{
private final BlockingQueue<MyObj> queue;
public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
public MyObjHandler(BlockingQueue queue)
{
this.queue = queue;
Finished = false;
}
@Override
public void run()
{
while (true)
{
try
{
MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
{
// process obj here
// ...
}
if(Finished && queue.isEmpty())
return;
}
catch (InterruptedException e)
{
return;
}
}
}
}
public void testHandler()
{
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjHandler handler = new MyObjHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
{
queue.put(i.next());
}
// what code should go here to tell the handler to stop waiting for more objects?
handler.Finished = true; //THIS TELLS HIM
//If you need you can wait for the termination otherwise remove join
myThread.join();
}
Cela a résolu à la fois les problèmes
- A marqué la
BlockingQueue
pour qu'il sache qu'il ne doit plus attendre pour les éléments - N'a pas été interrompu afin que les blocs de traitement ne se terminent que lorsque tous les éléments de la file d'attente sont traités et qu'il ne reste aucun élément à ajouter
Interrompez le fil de discussion:
thread.interrupt()
Ou n'interrompez pas, c'est méchant.
public class MyQueue<T> extends ArrayBlockingQueue<T> {
private static final long serialVersionUID = 1L;
private boolean done = false;
public ParserQueue(int capacity) { super(capacity); }
public void done() { done = true; }
public boolean isDone() { return done; }
/**
* May return null if producer ends the production after consumer
* has entered the element-await state.
*/
public T take() throws InterruptedException {
T el;
while ((el = super.poll()) == null && !done) {
synchronized (this) {
wait();
}
}
return el;
}
}
- quand le producteur place un objet dans la file d'attente, appelez
queue.notify ()
, s'il se termine, appelezqueue.done ()
- boucle while (! queue.isDone () ||! queue.isEmpty ())
- test take () renvoie la valeur null