Java BlockingQueue не имеет блокировки?
-
20-09-2019 - |
Вопрос
У меня есть блокировка очереди объектов.
Я хочу написать поток, который блокирует, пока в очереди нет объекта. Аналогично функциональности, предоставленной blockqueue.take ().
Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто заглянуть () и не удалять объект. Я хочу удалить объект, только если смогу успешно обработать его.
Итак, я бы хотел блокировать функцию Peek (). В настоящее время Peek () просто возвращается, если очередь пуст в соответствии с Javadocs.
Я что-то упускаю? Есть ли другой способ достичь этой функциональности?
РЕДАКТИРОВАТЬ:
Любые мысли о том, использовал ли я просто безопасную очередь нити, заглядывал и спал и спал?
public void run() {
while (!exit) {
while (queue.size() != 0) {
Object o = queue.peek();
if (o != null) {
if (consume(o) == true) {
queue.remove();
} else {
Thread.sleep(10000); //need to backoff (60s) and try again
}
}
}
Thread.sleep(1000); //wait 1s for object on queue
}
}
Обратите внимание, что у меня есть только один потребительский поток и один (отдельный) поток производителя. Я предполагаю, что это не так эффективно, как использование блокировки ... любые комментарии ценят.
Решение
Вы можете использовать LinkedBlockingDeque и физически удалить элемент из очереди (используя takeLast()
) но замените его снова на конец очереди Если обработка не удается с помощью putLast(E e)
. Анкет Тем временем ваши «производители» добавят элементы к фронт очереди с использованием putFirst(E e)
.
Вы всегда можете заключить это поведение в свое Queue
реализация и предоставление blockingPeek()
метод, который работает takeLast()
с последующим putLast()
за кулисами на основе LinkedBlockingDeque
. Анкет Следовательно, с точки зрения вызывающего клиента элемент никогда не удаляется из вашей очереди.
Другие советы
Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто заглянуть () и не удалять объект. Я хочу удалить объект, только если смогу успешно обработать его.
В общем, это не безопасно. Что если после тебя peek()
и определить, что объект может быть успешно обработан, но перед вами take()
Это для удаления и обработки, другой поток берет этот объект?
Единственное, что я знаю, это делает это, это Блокинг -буффер в Apache Commons Collections:
Если либо получить, либо удалить вызовы в пустом буфере, вызовный поток ожидает уведомления о том, что операция ADD или ADDALL завершена.
get()
эквивалентно peek()
, и а Buffer
можно сделать, чтобы вести себя как BlockingQueue
декорировав а Unboundedfifobuffer с BlockingBuffer
Не могли бы вы также просто добавить очередь слушателя событий в свою очередь блокировки, а затем, когда что -то добавляется в (блокирующую) очередь, отправьте событие с ваших слушателей? Вы могли бы иметь блок потока, пока его не будет вызван метод действий.
Быстрый ответ в том, что на самом деле нет способа блокировки, стержня, внедряющая блокирующую очередь с блокирующей Peek () самостоятельно.
Я что-то упускаю?
peek () может быть неприятным с параллелизмом -
- Если вы не можете обработать свое сообщение Peek () 'D - оно останется в очереди, если у вас нет нескольких потребителей.
- Кто собирается вытащить этот объект из очереди, если вы не можете его обработать?
- Если у вас есть несколько потребителей, вы получаете условия гонки между вами Peek (), а другой поток также обрабатывает элементы, что приводит к дублирующей обработке или хуже.
Похоже, вам может быть лучше, на самом деле снять элемент и обработать его, используяЦепочка ответов
РЕДАКТИРОВАТЬ: Re: Ваш последний пример: если у вас есть только 1 потребитель, вы никогда не избавитесь от объекта в очереди - если только он не обновлен в то время - в этом случае вам лучше быть очень осторожным в отношении безопасности потоков и Вероятно, все равно не следовало бы помещать элемент в очередь.
Похоже, что сама Blockqueue не имеет той функциональности, которую вы указываете.
Я мог бы попытаться немного перепродать проблему, хотя: что бы вы сделали с объектами, которые не можете «обработать правильно»? Если вы просто оставите их в очереди, вам придется вытащить их в какой -то момент и разобраться с ними. Я бы рекомендовал либо выяснить, как их обрабатывать (обычно, если queue.get () дает какую -либо недействительную или плохую ценность, вы, вероятно, в порядке, просто бросить его на пол), либо выбрать другую структуру данных, чем FIFO.
Не ответ как таковой, но: JDK-6653412 Претензии это не является действительным вариантом использования.
«Простейшее» решение
Не обрабатывать следующий элемент до предыдущий Элемент обрабатывается успешно.
public void run() {
Object lastSuccessfullyProcessedElement = null;
while (!exit) {
Object obj = lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking
boolean successful = process(obj);
if(!successful) {
lastSuccessfullyProcessedElement = obj;
} else {
lastSuccessfullyProcessedElement = null;
}
}
}
- Вызов
peek()
и проверить, является ли значение нулевым не эффективным.
Я видел использование процессора, которое достигло 10% в моей системе, когда очередь пуста для следующей программы.
while (true) {
Object o = queue.peek();
if(o == null) continue;
// omitted for the sake of brevity
}
Добавление
sleep()
добавляет медлительность.Добавление его обратно в очередь с помощью
putLast
будет нарушать заказ. Более того, это операция блокировки, которая требует замков.