Question

J'ai une classe qui prend des objets d'un BlockingQueue et les traite en appelant take () dans une boucle continue. À un moment donné, je sais qu'aucun autre objet ne sera ajouté à la file d'attente. Comment puis-je interrompre la méthode take () pour qu’elle cesse de bloquer?

Voici la classe qui traite les objets:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

Et voici la méthode qui utilise cette classe pour traiter des objets:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
Était-ce utile?

La solution

Si interrompre le fil n'est pas une option, une autre consiste à placer un "marqueur". ou " commande " objet de la file qui serait reconnu comme tel par MyObjHandler et sortirait de la boucle.

Autres conseils

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Toutefois, si vous procédez ainsi, le fil peut être interrompu tant qu'il reste des éléments dans la file d'attente en attente de traitement. Vous pouvez envisager d'utiliser poll au lieu de take , ce qui permettra à la tâche de traitement de se terminer et de se terminer lorsqu'elle a attendu un certain temps avec pas de nouvelle entrée.

Très tard, mais j'espère que cela aidera les autres aussi , car je rencontrais le même problème et utilisais l'approche poll proposée par erickson ci-dessus avec quelques modifications mineures,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

Cela a résolu à la fois les problèmes

  1. A marqué la BlockingQueue pour qu'il sache qu'il ne doit plus attendre pour les éléments
  2. N'a pas été interrompu afin que les blocs de traitement ne se terminent que lorsque tous les éléments de la file d'attente sont traités et qu'il ne reste aucun élément à ajouter

Interrompez le fil de discussion:

thread.interrupt()

Ou n'interrompez pas, c'est méchant.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. quand le producteur place un objet dans la file d'attente, appelez queue.notify () , s'il se termine, appelez queue.done ()
  2. boucle while (! queue.isDone () ||! queue.isEmpty ())
  3. test take () renvoie la valeur null
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top