Параллельная и блокирующая очередь в Java
-
06-07-2019 - |
Вопрос
У меня классическая проблема с потоком, отправляющим события во входящую очередь второго потока.Только на этот раз меня очень интересует производительность.Чего я хочу достичь, так это:
- Я хочу одновременный доступ к очереди, чтобы производитель нажимал, а получатель выскакивал.
- Когда очередь пуста, я хочу, чтобы потребитель заблокировался в очереди, ожидая производителя.
Моя первая идея состояла в том, чтобы использовать LinkedBlockingQueue
, но вскоре я понял, что это не одновременно, и производительность пострадала.С другой стороны, теперь я использую ConcurrentLinkedQueue
, но все равно я плачу за wait()
/ notify()
о каждой публикации.Поскольку потребитель, обнаружив пустую очередь, не блокируется, я должен синхронизировать и wait()
на замке.С другой стороны, производитель должен получить эту блокировку и notify()
после каждой отдельной публикации.Общий результат заключается в том, что я оплачиваю стоимость
sycnhronized (lock) {lock.notify()}
в каждой отдельной публикации, даже когда в этом нет необходимости.
То, что, я думаю, здесь необходимо, - это очередь, которая является одновременно блокирующей и параллельной.Я представляю себе push()
операция для работы как в ConcurrentLinkedQueue
, с дополнительным notify()
к объекту, когда перемещаемый элемент является первым в списке.Такая проверка, я считаю, уже существует в ConcurrentLinkedQueue
, так как нажатие требует соединения со следующим элементом.Таким образом, это было бы намного быстрее, чем каждый раз синхронизировать внешнюю блокировку.
Является ли что-то подобное доступным / разумным?
Решение
Я думаю, вы можете придерживаться java.util.concurrent.LinkedBlockingQueue
независимо от ваших сомнений.Это происходит одновременно.Хотя я понятия не имею о его производительности.Вероятно, другая реализация BlockingQueue
вам больше подойдет.Их не так уж много, поэтому проведите тесты производительности и измерьте.
Другие советы
Похоже на этот ответ https://stackoverflow.com/a/1212515/1102730 но немного по-другому..В итоге я использовал ExecutorService
.Вы можете создать один из них, используя Executors.newSingleThreadExecutor()
.Мне нужна была параллельная очередь для чтения / записи буферных изображений в файлы, а также атомарность при чтении и записи.Мне нужен только один поток, потому что файловый ввод-вывод на порядки быстрее, чем исходный, сетевой ввод-вывод.Кроме того, меня больше беспокоила атомарность действий и корректность, чем производительность, но этот подход также может быть реализован с несколькими потоками в пуле, чтобы ускорить процесс.
Чтобы получить изображение (Try-Catch-Окончательно опущено):
Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() {
@Override
public BufferedImage call() throws Exception {
ImageInputStream is = new FileImageInputStream(file);
return ImageIO.read(is);
}
})
image = futureImage.get();
Чтобы сохранить изображение (Try-Catch-Окончательно опущено):
Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
FileOutputStream os = new FileOutputStream(file);
return ImageIO.write(image, getFileFormat(), os);
}
});
boolean wasWritten = futureWrite.get();
Важно отметить, что вы должны очищать и закрывать свои потоки в блоке finally.Я не знаю, как это работает по сравнению с другими решениями, но оно довольно универсально.
Я бы посоветовал вам взглянуть на Потоковый пул - исполнитель newSingleThreadExecutor - новостной редактор.Он будет обрабатывать упорядочивание ваших задач за вас, и если вы отправите Вызываемые объекты вашему исполнителю вы также сможете передать блокирующее поведение, которое вы ищете.
Вы можете попробовать LinkedTransferQueue из jsr166: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/
Это соответствует вашим требованиям и требует меньших накладных расходов на операции предложения / опроса.Как я могу видеть из кода, когда очередь не пуста, она использует атомарные операции для опроса элементов.И когда очередь пуста, она вращается некоторое время и в случае неудачи останавливает поток.Я думаю, это может помочь в вашем случае.
Я использую ArrayBlockingQueue всякий раз, когда мне нужно передать данные из одного потока в другой.Используя методы put и take (которые будут блокироваться, если они заполнены / пусты).
Вот такой список классов, реализующих BlockingQueue
.
Я бы рекомендовал проверить SynchronousQueue
.
Как и @Rorick, упомянутый в его комментарии, я считаю, что все эти реализации параллельны.Я думаю, что ваши опасения по поводу LinkedBlockingQueue
может быть, это неуместно.