如何中断在take()上阻塞的BlockingQueue?
-
03-07-2019 - |
题
我有一个类从 BlockingQueue
获取对象,并通过在连续循环中调用 take()
来处理它们。在某些时候,我知道不会有更多的对象被添加到队列中。如何中断 take()
方法以阻止阻塞?
这是处理对象的类:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
这是使用此类处理对象的方法:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
解决方案
如果不打算中断线程,则另一个是放置“标记”。或“命令”队列中的对象,MyObjHandler会将其识别出来,然后突破循环。
其他提示
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
thread.interrupt();
但是,如果执行此操作,则当队列中仍有项目等待处理时,线程可能会被中断。您可能需要考虑使用 poll
而不是 take
,这将允许处理线程超时并在等待一段时间后终止没有新的输入。
很晚但希望这对其他人也有帮助我遇到了类似的问题,并使用 poll 方法/ questions / 812342 / how-to-interrupt-a-blockingqueue-which-blocking-on-take / 812354#812354“> erickson 以及一些细微的变化,
class MyObjHandler implements Runnable
{
private final BlockingQueue<MyObj> queue;
public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
public MyObjHandler(BlockingQueue queue)
{
this.queue = queue;
Finished = false;
}
@Override
public void run()
{
while (true)
{
try
{
MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
{
// process obj here
// ...
}
if(Finished && queue.isEmpty())
return;
}
catch (InterruptedException e)
{
return;
}
}
}
}
public void testHandler()
{
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjHandler handler = new MyObjHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
{
queue.put(i.next());
}
// what code should go here to tell the handler to stop waiting for more objects?
handler.Finished = true; //THIS TELLS HIM
//If you need you can wait for the termination otherwise remove join
myThread.join();
}
这解决了两个问题
- 标记
BlockingQueue
,以便它知道它不必等待更多元素 - 两者之间没有中断,只有当处理队列中的所有项目并且没有剩余项目需要添加时,处理块才会终止 醇>
中断线程:
thread.interrupt()
或者不要打断它的讨厌。
public class MyQueue<T> extends ArrayBlockingQueue<T> {
private static final long serialVersionUID = 1L;
private boolean done = false;
public ParserQueue(int capacity) { super(capacity); }
public void done() { done = true; }
public boolean isDone() { return done; }
/**
* May return null if producer ends the production after consumer
* has entered the element-await state.
*/
public T take() throws InterruptedException {
T el;
while ((el = super.poll()) == null && !done) {
synchronized (this) {
wait();
}
}
return el;
}
}
- 当生产者将对象放入队列时,调用
queue.notify()
,如果结束,则调用queue.done()
- 循环while(!queue.isDone()||!queue.isEmpty())
- test take()返回null 的值 醇>
不隶属于 StackOverflow