Вопрос

я использую java.util.concurrent.blockqueue В очень простом сценарии продюсера-потребитель. Например, этот псевдод изображает потребительскую часть:

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Все идет нормально. В Javadoc в очереди блокировки я прочитал:

Blockingqueue не поддерживает какую -либо операцию «закрытия» или «выключения», чтобы указать, что больше элементов не будет добавлено. Потребности и использование таких функций, как правило, зависят от реализации. Например, общая тактика заключается в том, чтобы производители вставили специальные объекты в конце потока или ядовиты, которые интерпретируются соответственно, когда его принимают потребители.

К сожалению, из -за использования дженериков и природы ComplexObject не тривиально втянуть «ядовитый объект» в очередь. Так что эта «общая тактика» не очень удобна в моем сценарии.

Мой вопрос: какая еще хорошая тактика/шаблоны я могу использовать, чтобы «закрыть» очередь?

Благодарю вас!

Это было полезно?

Решение

Если у вас есть ручка для потребительского потока, вы можете прервать ее. С помощью кода, который вы дали, это убьет потребителя. Я бы не ожидал, что у продюсера это будет; Вероятно, ему придется каким -то образом обратное обращение к контроллеру программы, чтобы он знал, что это сделано. Затем контроллер прервал бы потребительский поток.

Вы всегда можете закончить работу, прежде чем подчиняться прерыванию. Например:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

Я бы на самом деле предпочел бы это каким -то образом отравить очередь. Если вы хотите убить нить, попросите ветку убить себя.

(Приятно видеть, как кто -то обращается InterruptedException правильно.)


Кажется, здесь есть некоторые споры о обращении с перерывами здесь. Во -первых, я хотел бы, чтобы все прочитали эту статью: http://www.ibm.com/developerworks/java/library/j-jtp05236.html

Теперь, понимая, что никто на самом деле это не читает, вот сделка. Поток получит только InterruptedException Если он в настоящее время блокировал во время прерывания. В таком случае, Thread.interrupted() вернется false. Анкет Если бы это не блокировало, он не получит это исключение, и вместо этого Thread.interrupted() вернется true. Анкет Поэтому ваш охранник петли должен абсолютно, несмотря ни на что, проверьте Thread.interrupted(), или иным образом рискует пропустить прерывание в потоке.

Итак, так как вы проверяете Thread.interrupted() Независимо от того, что и вы вынуждены поймать InterruptedException (и должен иметь дело с этим, даже если вы не были вынуждены), у вас теперь есть две области кода, которые обрабатывают одно и то же событие, прерывание потока. Одним из способов обработки этого является нормализовать их в одно условие, что означает, что либо логический проверка состояния может добавить исключение, либо исключение может установить логическое состояние. Я выбираю позже.


Редактировать: Обратите внимание, что прерванный метод статического потока# очистки прерванное состояние текущего потока.

Другие советы

Еще одна идея для этого простых:

class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

Затем используйте BlockingQueue<QueueableComplexObject> как очередь. Когда вы хотите закончить обработку очереди, сделайте queue.offer(NullComplexObject.INSTANCE). Анкет С точки зрения потребителя, сделай

boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

Нет instanceof требуется, и вам не нужно строить фальшиво ComplexObject что может быть дорого/трудным в зависимости от его реализации.

Альтернативой будет обернуть обработку, которую вы делаете с помощью ExecutorService, и пусть ExecutorService Сам контролирует, добавляются ли задания в очередь.

По сути, вы пользуетесь преимуществами ExecutorService.shutdown(), который, когда называется Disallows, какие -либо задачи обрабатываются исполнителем.

Я не уверен, как вы в настоящее время отправляете задачи QueueConsumer в вашем примере. Я предположил, что у вас есть какой -то submit() метод и использовал аналогичный метод в примере.

import java.util.concurrent.*;

class QueueConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void shutdown() {
        executor.shutdown(); // gracefully shuts down the executor
    }

    // 'Result' is a class you'll have to write yourself, if you want.
    // If you don't need to provide a result, you can just Runnable
    // instead of Callable.
    public Future<Result> submit(final ComplexObject complexObject) {
        if(executor.isShutdown()) {
            // handle submitted tasks after the executor has been told to shutdown
        }

        return executor.submit(new Callable<Result>() {
            @Override
            public Result call() {
                return process(complexObject);
            }
        });
    }

    private Result process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

Этот пример-просто безжалосная иллюстрация того, что java.util.concurrent Предложения пакетов; Вероятно, есть некоторые оптимизации, которые могут быть сделаны для этого (например, QueueConsumer Поскольку его собственный класс, вероятно, даже не нужен; Вы можете просто предоставить ExecutorService на то, что производители представляют задачи).

Копаться через java.util.concurrent Пакет (начиная с некоторых ссылок выше). Вы можете обнаружить, что это дает вам много отличных вариантов для того, что вы пытаетесь сделать, и вам даже не нужно беспокоиться о регулировании очереди работы.

Другая возможность создания объекта яда: сделать его конкретным экземпляром класса. Таким образом, вам не нужно разбивать вокруг подтипов или испортить общий.

Недостаток: это не сработает, если между производителем и потребителем будет какой -то барьер сериализации.

public class ComplexObject
{
    public static final POISON_INSTANCE = new ComplexObject();

    public ComplexObject(whatever arguments) {
    }

    // Empty constructor for creating poison instance.
    private ComplexObject() {
    }
}

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().interrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                if (complexObject == ComplexObject.POISON_INSTANCE)
                    return;

                // Process complex object.

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }
    }
}

Можно ли расширить комплексы и высмеивать нетривиальную функциональность создания? По сути, вы заканчиваете объектом Shell, но вы можете сделать тогда instance of Чтобы увидеть, является ли конец объекта очереди.

Вы можете обернуть свой общий объект в DataObject. На этом объеме данных вы можете добавить дополнительные данные, такие как статус объекта Poison. DataObject - это класс с 2 полями. T complexObject; а также boolean poison;.

Ваш потребитель берет объекты данных из очереди. Если объект ядовита возвращается, вы закрываете потребителя, иначе вы разверните общий процесс и называете «процесс (комплексобъект)».

Я использую java.util.concurrent.LinkedBlockingDeque<E> чтобы вы могли добавить объект в конце очереди и взять их спереди. Таким образом, ваш объект будет обрабатываться по порядку, но более важно, чтобы закрыть очередь после того, как вы столкнетесь с ядовитым объектом.

Чтобы поддержать нескольких потребителей, я добавляю объект яда обратно в очередь, когда сталкиваюсь с ним.

public final class Data<T> {
    private boolean poison = false;
    private T complexObject;

    public Data() {
        this.poison = true;
    }

    public Data(T complexObject) {
        this.complexObject = complexObject;
    }

    public boolean isPoison() {
        return poison;
    }

    public T getComplexObject() {
        return complexObject;
    }
}
public class Consumer <T> implements Runnable {

    @Override
    public final void run() {
        Data<T> data;
        try {
            while (!(data = queue.takeFirst()).isPoison()) {
                process(data.getComplexObject());
            }
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // add the poison object back so other consumers can stop too.
        queue.addLast(line);
    }
}

Мне кажется разумным внедрить близкие BlockingQueue:

import java.util.concurrent.BlockingQueue;

public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
    /** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
    public boolean isClosed();

    /** Closes this queue; elements cannot be added to a closed queue. **/
    public void close();
}

Было бы довольно просто реализовать это с помощью следующего поведения (ср. Методы сводная таблица):

  • Вставлять:

  • Удалять:

  • Исследовать

    Без изменений.

Я сделал это, редактируя источник, найти это в github.com.

В этой ситуации, как правило, вам приходится отказаться от дженериков и сделать объект хранения очереди. Затем вам просто нужно проверить свой объект «Яд», прежде чем поднимать фактический тип.

Я использовал эту систему:

ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
    ...
    sharedQueue.drainTo(docs);
    ...
} while (queueIsNotEmpty || sharedQueue.isEmpty());

Когда продюсер заканчивается, я устанавливаю на ConsumerObject, Queueisnotempty Field to false

Сегодня я решил эту проблему, используя объект обертки. Поскольку комплексобъект слишком сложный для подкласса, я обернул комплексобъект в объект ComplexObjectWrapper. Затем использовал ComplexObjectWrapper в качестве общего типа.

public class ComplexObjectWrapper {
ComplexObject obj;
}

public class EndOfQueue extends ComplexObjectWrapper{}

Теперь вместо BlockingQueue<ComplexObject> Я сделал BlockingQueue<ComplexObjectWrapper>

Так как я контролировал как потребителя, так и производителя, это решение сработало для меня.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top