Frage

Ich habe eine Klasse, die Objekte von einem BlockingQueue nimmt und verarbeitet sie durch take() in einer Endlosschleife aufgerufen wird. Irgendwann weiß ich, dass keine Objekte mehr in die Warteschlange aufgenommen werden. Wie unterbreche ich die take() Methode, so dass es blockiert nicht mehr reagiert?

Hier ist die Klasse, die die Objekte verarbeitet:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

Und hier ist die Methode, die diese Klasse verwendet, Objekte zu verarbeiten:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
War es hilfreich?

Lösung

, um den Faden, wenn Unterbrechung nicht möglich ist, eine andere ist eine „Markierung“ zu setzen oder „command“ Objekt auf der Warteschlange, die als solche von MyObjHandler und brechen aus der Schleife erkannt werden würde.

Andere Tipps

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Allerdings, wenn Sie dies tun, kann der Thread unterbrochen werden, solange es noch Elemente in der Warteschlange ist, wartet verarbeitet zu werden. Sie könnten mit

Sehr spät, aber Hoffnung hilft das andere auch als ich ähnliches Problem konfrontiert und verwendet, um die poll Ansatz vorgeschlagen von Erickson oben mit einigen geringfügigen Änderungen,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

Dies löste sowohl die Probleme

  1. Markiert die BlockingQueue so dass es weiß, hat es nicht mehr zu warten, für die Elemente
  2. Haben Sie nicht unterbrochen dazwischen, so dass Verarbeitungsblöcke nur beendet, wenn alle Elemente in der Warteschlange verarbeitet werden, und es gibt noch keine Elemente hinzugefügt werden

Unterbrechen Sie den Faden:

thread.interrupt()

oder nicht unterbrechen, seine böse.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. , wenn Hersteller legt in die Warteschlange-Objekt, rufen queue.notify(), wenn es endet, rufen queue.done()
  2. Schleife while (! Queue.isDone () ||! Queue.isEmpty ())
  3. Test nehmen () Rückgabewert für null
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top