«Закрытие» блокирующая очередь
-
28-10-2019 - |
Вопрос
я использую java.util.concurrent.blockqueue В очень простом сценарии продюсера-потребитель. Например, этот псевдод изображает потребительскую часть:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(true)
{
try {
ComplexObject complexObject = myBlockingQueue.take();
//do something with the complex object
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Все идет нормально. В Javadoc в очереди блокировки я прочитал:
Blockingqueue не поддерживает какую -либо операцию «закрытия» или «выключения», чтобы указать, что больше элементов не будет добавлено. Потребности и использование таких функций, как правило, зависят от реализации. Например, общая тактика заключается в том, чтобы производители вставили специальные объекты в конце потока или ядовиты, которые интерпретируются соответственно, когда его принимают потребители.
К сожалению, из -за использования дженериков и природы ComplexObject не тривиально втянуть «ядовитый объект» в очередь. Так что эта «общая тактика» не очень удобна в моем сценарии.
Мой вопрос: какая еще хорошая тактика/шаблоны я могу использовать, чтобы «закрыть» очередь?
Благодарю вас!
Решение
Если у вас есть ручка для потребительского потока, вы можете прервать ее. С помощью кода, который вы дали, это убьет потребителя. Я бы не ожидал, что у продюсера это будет; Вероятно, ему придется каким -то образом обратное обращение к контроллеру программы, чтобы он знал, что это сделано. Затем контроллер прервал бы потребительский поток.
Вы всегда можете закончить работу, прежде чем подчиняться прерыванию. Например:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(!(Thread.currentThread().isInterrupted())) {
try {
final ComplexObject complexObject = myBlockingQueue.take();
this.process(complexObject);
} catch (InterruptedException e) {
// Set interrupted flag.
Thread.currentThread().interrupt();
}
}
// Thread is getting ready to die, but first,
// drain remaining elements on the queue and process them.
final LinkedList<ComplexObject> remainingObjects;
myBlockingQueue.drainTo(remainingObjects);
for(ComplexObject complexObject : remainingObjects) {
this.process(complexObject);
}
}
private void process(final ComplexObject complexObject) {
// Do something with the complex object.
}
}
Я бы на самом деле предпочел бы это каким -то образом отравить очередь. Если вы хотите убить нить, попросите ветку убить себя.
(Приятно видеть, как кто -то обращается InterruptedException
правильно.)
Кажется, здесь есть некоторые споры о обращении с перерывами здесь. Во -первых, я хотел бы, чтобы все прочитали эту статью: http://www.ibm.com/developerworks/java/library/j-jtp05236.html
Теперь, понимая, что никто на самом деле это не читает, вот сделка. Поток получит только InterruptedException
Если он в настоящее время блокировал во время прерывания. В таком случае, Thread.interrupted()
вернется false
. Анкет Если бы это не блокировало, он не получит это исключение, и вместо этого Thread.interrupted()
вернется true
. Анкет Поэтому ваш охранник петли должен абсолютно, несмотря ни на что, проверьте Thread.interrupted()
, или иным образом рискует пропустить прерывание в потоке.
Итак, так как вы проверяете Thread.interrupted()
Независимо от того, что и вы вынуждены поймать InterruptedException
(и должен иметь дело с этим, даже если вы не были вынуждены), у вас теперь есть две области кода, которые обрабатывают одно и то же событие, прерывание потока. Одним из способов обработки этого является нормализовать их в одно условие, что означает, что либо логический проверка состояния может добавить исключение, либо исключение может установить логическое состояние. Я выбираю позже.
Редактировать: Обратите внимание, что прерванный метод статического потока# очистки прерванное состояние текущего потока.
Другие советы
Еще одна идея для этого простых:
class ComplexObject implements QueueableComplexObject
{
/* the meat of your complex object is here as before, just need to
* add the following line and the "implements" clause above
*/
@Override public ComplexObject asComplexObject() { return this; }
}
enum NullComplexObject implements QueueableComplexObject
{
INSTANCE;
@Override public ComplexObject asComplexObject() { return null; }
}
interface QueueableComplexObject
{
public ComplexObject asComplexObject();
}
Затем используйте BlockingQueue<QueueableComplexObject>
как очередь. Когда вы хотите закончить обработку очереди, сделайте queue.offer(NullComplexObject.INSTANCE)
. Анкет С точки зрения потребителя, сделай
boolean ok = true;
while (ok)
{
ComplexObject obj = queue.take().asComplexObject();
if (obj == null)
ok = false;
else
process(obj);
}
/* interrupt handling elided: implement this as you see fit,
* depending on whether you watch to swallow interrupts or propagate them
* as in your original post
*/
Нет instanceof
требуется, и вам не нужно строить фальшиво ComplexObject
что может быть дорого/трудным в зависимости от его реализации.
Альтернативой будет обернуть обработку, которую вы делаете с помощью ExecutorService
, и пусть ExecutorService
Сам контролирует, добавляются ли задания в очередь.
По сути, вы пользуетесь преимуществами ExecutorService.shutdown()
, который, когда называется Disallows, какие -либо задачи обрабатываются исполнителем.
Я не уверен, как вы в настоящее время отправляете задачи QueueConsumer
в вашем примере. Я предположил, что у вас есть какой -то submit()
метод и использовал аналогичный метод в примере.
import java.util.concurrent.*;
class QueueConsumer {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public void shutdown() {
executor.shutdown(); // gracefully shuts down the executor
}
// 'Result' is a class you'll have to write yourself, if you want.
// If you don't need to provide a result, you can just Runnable
// instead of Callable.
public Future<Result> submit(final ComplexObject complexObject) {
if(executor.isShutdown()) {
// handle submitted tasks after the executor has been told to shutdown
}
return executor.submit(new Callable<Result>() {
@Override
public Result call() {
return process(complexObject);
}
});
}
private Result process(final ComplexObject complexObject) {
// Do something with the complex object.
}
}
Этот пример-просто безжалосная иллюстрация того, что java.util.concurrent
Предложения пакетов; Вероятно, есть некоторые оптимизации, которые могут быть сделаны для этого (например, QueueConsumer
Поскольку его собственный класс, вероятно, даже не нужен; Вы можете просто предоставить ExecutorService
на то, что производители представляют задачи).
Копаться через java.util.concurrent
Пакет (начиная с некоторых ссылок выше). Вы можете обнаружить, что это дает вам много отличных вариантов для того, что вы пытаетесь сделать, и вам даже не нужно беспокоиться о регулировании очереди работы.
Другая возможность создания объекта яда: сделать его конкретным экземпляром класса. Таким образом, вам не нужно разбивать вокруг подтипов или испортить общий.
Недостаток: это не сработает, если между производителем и потребителем будет какой -то барьер сериализации.
public class ComplexObject
{
public static final POISON_INSTANCE = new ComplexObject();
public ComplexObject(whatever arguments) {
}
// Empty constructor for creating poison instance.
private ComplexObject() {
}
}
class QueueConsumer implements Runnable {
@Override
public void run() {
while(!(Thread.currentThread().interrupted())) {
try {
final ComplexObject complexObject = myBlockingQueue.take();
if (complexObject == ComplexObject.POISON_INSTANCE)
return;
// Process complex object.
} catch (InterruptedException e) {
// Set interrupted flag.
Thread.currentThread().interrupt();
}
}
}
}
Можно ли расширить комплексы и высмеивать нетривиальную функциональность создания? По сути, вы заканчиваете объектом Shell, но вы можете сделать тогда instance of
Чтобы увидеть, является ли конец объекта очереди.
Вы можете обернуть свой общий объект в DataObject. На этом объеме данных вы можете добавить дополнительные данные, такие как статус объекта Poison. DataObject - это класс с 2 полями. T complexObject;
а также boolean poison;
.
Ваш потребитель берет объекты данных из очереди. Если объект ядовита возвращается, вы закрываете потребителя, иначе вы разверните общий процесс и называете «процесс (комплексобъект)».
Я использую java.util.concurrent.LinkedBlockingDeque<E>
чтобы вы могли добавить объект в конце очереди и взять их спереди. Таким образом, ваш объект будет обрабатываться по порядку, но более важно, чтобы закрыть очередь после того, как вы столкнетесь с ядовитым объектом.
Чтобы поддержать нескольких потребителей, я добавляю объект яда обратно в очередь, когда сталкиваюсь с ним.
public final class Data<T> {
private boolean poison = false;
private T complexObject;
public Data() {
this.poison = true;
}
public Data(T complexObject) {
this.complexObject = complexObject;
}
public boolean isPoison() {
return poison;
}
public T getComplexObject() {
return complexObject;
}
}
public class Consumer <T> implements Runnable {
@Override
public final void run() {
Data<T> data;
try {
while (!(data = queue.takeFirst()).isPoison()) {
process(data.getComplexObject());
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// add the poison object back so other consumers can stop too.
queue.addLast(line);
}
}
Мне кажется разумным внедрить близкие BlockingQueue
:
import java.util.concurrent.BlockingQueue;
public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
/** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
public boolean isClosed();
/** Closes this queue; elements cannot be added to a closed queue. **/
public void close();
}
Было бы довольно просто реализовать это с помощью следующего поведения (ср. Методы сводная таблица):
Вставлять:
Бросает исключение, Специальное значение:
Ведет себя как полный
Queue
, ответственность вызывающего абонента за тестированиеisClosed()
.-
Бросает
IllegalStateException
Если и когда закрыто. -
Возврат
false
Если и при закрытии, ответственность вызывающего абонента протестироватьisClosed()
.
Удалять:
Бросает исключение, Специальное значение:
Ведет себя как пустой
Queue
, ответственность вызывающего абонента за тестированиеisClosed()
.-
Бросает
NoSuchElementException
Если и когда закрыто. -
Возврат
null
Если и при закрытии, ответственность вызывающего абонента протестироватьisClosed()
.
Исследовать
Без изменений.
Я сделал это, редактируя источник, найти это в github.com.
В этой ситуации, как правило, вам приходится отказаться от дженериков и сделать объект хранения очереди. Затем вам просто нужно проверить свой объект «Яд», прежде чем поднимать фактический тип.
Я использовал эту систему:
ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
...
sharedQueue.drainTo(docs);
...
} while (queueIsNotEmpty || sharedQueue.isEmpty());
Когда продюсер заканчивается, я устанавливаю на ConsumerObject, Queueisnotempty Field to false
Сегодня я решил эту проблему, используя объект обертки. Поскольку комплексобъект слишком сложный для подкласса, я обернул комплексобъект в объект ComplexObjectWrapper. Затем использовал ComplexObjectWrapper в качестве общего типа.
public class ComplexObjectWrapper {
ComplexObject obj;
}
public class EndOfQueue extends ComplexObjectWrapper{}
Теперь вместо BlockingQueue<ComplexObject>
Я сделал BlockingQueue<ComplexObjectWrapper>
Так как я контролировал как потребителя, так и производителя, это решение сработало для меня.