Domanda

Ho una classe che prende oggetti da un BlockingQueue e li elabora chiamando take () in un ciclo continuo. Ad un certo punto so che non verranno aggiunti altri oggetti alla coda. Come posso interrompere il metodo take () in modo che smetta di bloccare?

Ecco la classe che elabora gli oggetti:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

Ed ecco il metodo che utilizza questa classe per elaborare gli oggetti:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
È stato utile?

Soluzione

Se l'interruzione del thread non è un'opzione, un'altra è quella di posizionare un " marker " o " comando " oggetto sulla coda che sarebbe riconosciuto come tale da MyObjHandler e uscirà dal ciclo.

Altri suggerimenti

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Tuttavia, se lo fai, il thread potrebbe essere interrotto mentre ci sono ancora elementi nella coda, in attesa di essere elaborati. Puoi prendere in considerazione l'utilizzo di poll invece di take , che permetterà al thread di elaborazione di scadere e terminare quando ha atteso un po 'di tempo con nessun nuovo input.

Molto tardi, ma spero che questo aiuti anche altri come ho affrontato il problema simile e ho usato l'approccio poll suggerito da erickson sopra con alcune piccole modifiche,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

Ciò ha risolto entrambi i problemi

  1. Contrassegnato il BlockingQueue in modo che sappia che non deve attendere più elementi
  2. Non si è interrotto in mezzo in modo che i blocchi di elaborazione terminino solo quando vengono elaborati tutti gli elementi in coda e non ci sono elementi rimanenti da aggiungere

Interrompi il thread:

thread.interrupt()

O non interrompere, è brutto.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. quando il produttore inserisce l'oggetto nella coda, chiama queue.notify () , se finisce, chiama queue.done()
  2. loop while (! queue.isDone () ||! queue.isEmpty ())
  3. test take () restituisce valore per null
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top