سؤال

لدي فصل يأخذ الكائنات من ملف BlockingQueue ومعالجتها عن طريق الاتصال take() في حلقة مستمرة.أعلم في مرحلة ما أنه لن تتم إضافة المزيد من الكائنات إلى قائمة الانتظار.كيف أقاطع take() طريقة بحيث يتوقف عن الحجب؟

إليك الفئة التي تعالج الكائنات:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

وإليك الطريقة التي تستخدم هذه الفئة لمعالجة الكائنات:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
هل كانت مفيدة؟

المحلول

إذا انقطاع الخيط ليس خيارا، وآخر هو لوضع "علامة" أو "القيادة" الكائن على قائمة الانتظار التي سيتم الاعتراف بها على هذا النحو من قبل MyObjHandler والخروج من الحلقة.

نصائح أخرى

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

ولكن، إذا قمت بذلك، قد يكون انقطاع الخيط في حين لا تزال هناك عناصر في قائمة الانتظار، والانتظار لتتم معالجتها. قد ترغب في النظر في استخدام <لأ href = "http://java.sun.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#poll(long،٪20java.util.concurrent. TIMEUNIT) "يختلط =" noreferrer "> poll بدلا من ذلك من take، والتي سوف تسمح للموضوع تجهيز لمهلة وإنهاء عندما انتظر لبعض الوقت مع أي إدخال جديد.

متأخر جدًا ولكن آمل أن يساعد هذا الآخرين أيضًا لقد واجهت مشكلة مماثلة واستخدمت poll النهج الذي اقترحه اريكسون أعلاه مع بعض التغييرات الطفيفة،

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

هذا حل كلا المشاكل

  1. تم وضع علامة على BlockingQueue حتى يعرف أنه لا يتعين عليه الانتظار أكثر للعناصر
  2. لم تتم مقاطعتها بينهما بحيث تنتهي كتل المعالجة فقط عندما تتم معالجة جميع العناصر الموجودة في قائمة الانتظار ولا توجد عناصر متبقية لإضافتها

والمقاطعة للموضوع:

thread.interrupt()

أو لا تقاطع، هذا مقرف.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. عندما يضع المنتج كائنًا في قائمة الانتظار، اتصل به queue.notify(), ، إذا انتهى، اتصل queue.done()
  2. حلقة أثناء (!queue.isDone() || !queue.isEmpty())
  3. اختبار Take() القيمة المرجعة للقيمة الخالية
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top