Fila simultânea e bloqueadora em java
-
06-07-2019 - |
Pergunta
Eu tenho o problema clássico de um thread empurrando eventos para a fila de um segundo tópico. Só que desta vez, estou muito interessado no desempenho. O que eu quero alcançar é:
- Quero acesso simultâneo à fila, o produtor empurrando, o receptor estourando.
- Quando a fila está vazia, quero que o consumidor bloqueie a fila, esperando o produtor.
Minha primeira ideia foi usar um LinkedBlockingQueue
, mas logo percebi que não é simultâneo e o desempenho sofreu. Por outro lado, agora uso um ConcurrentLinkedQueue
, mas ainda estou pagando o custo de wait()
/ notify()
em cada publicação. Como o consumidor, ao encontrar uma fila vazia, não bloqueia, eu tenho que sincronizar e wait()
em uma fechadura. Por outro lado, o produtor tem que conseguir esse bloqueio e notify()
em cada publicação. O resultado geral é que estou pagando o custo desycnhronized (lock) {lock.notify()}
Em todas as publicações, mesmo quando não é necessário.
O que eu acho que é necessário aqui é uma fila que é bloqueadora e simultânea. Eu imagino um push()
operação para trabalhar como em ConcurrentLinkedQueue
, com um extra notify()
para o objeto quando o elemento empurrado é o primeiro da lista. Tal cheque que considero já existir no ConcurrentLinkedQueue
, como empurrar requer a conexão com o próximo elemento. Assim, isso seria muito mais rápido do que sincronizar todas as vezes na trava externa.
É algo assim disponível/razoável?
Solução
Eu acho que você pode seguir java.util.concurrent.LinkedBlockingQueue
independentemente de suas dúvidas. É simultâneo. No entanto, não tenho idéia do seu desempenho. Provavelmente, outra implementação de BlockingQueue
vai se adequar a você melhor. Não há muitos deles; portanto, faça testes de desempenho e meça.
Outras dicas
Semelhante a esta resposta https://stackoverflow.com/a/1212515/1102730 mas um pouco diferente .. acabei usando um ExecutorService
. Você pode instanciar um usando Executors.newSingleThreadExecutor()
. Eu precisava de uma fila simultânea para ler/escrever BufferImages para arquivos, bem como atomicidade com leituras e gravações. Eu só preciso de um único thread porque o arquivo IO é ordens de magnitude mais rápido que a fonte, net io. Além disso, eu estava mais preocupado com a atomicidade de ações e correção do que o desempenho, mas essa abordagem também pode ser feita com vários threads no pool para acelerar as coisas.
Para obter uma imagem (omitida por trinta-fina-final):
Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() {
@Override
public BufferedImage call() throws Exception {
ImageInputStream is = new FileImageInputStream(file);
return ImageIO.read(is);
}
})
image = futureImage.get();
Para salvar uma imagem (omitida por trinta-fina-final):
Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
FileOutputStream os = new FileOutputStream(file);
return ImageIO.write(image, getFileFormat(), os);
}
});
boolean wasWritten = futureWrite.get();
É importante observar que você deve liberar e fechar os fluxos em um bloco finalmente. Não sei como ele funciona em comparação com outras soluções, mas é bastante versátil.
Eu sugiro que você olhe para ThreadpoolExecutor NewsingleThreadExecutor. Ele vai lidar com suas tarefas encomendadas para você e se você enviar Callables Para o seu executor, você poderá obter o comportamento de bloqueio que está procurando também.
Você pode tentar o LinkedTransferqueue do JSR166: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/
Ele atende aos seus requisitos e tem menos despesas gerais para operações de oferta/pesquisa. Como posso ver no código, quando a fila não estiver vazia, ele usa operações atômicas para elementos de votação. E quando a fila estiver vazia, ela gira por algum tempo e estacione o tópico, se malsucedido. Eu acho que pode ajudar no seu caso.
Eu uso o ArrayBlockingQueue sempre que preciso passar dados de um thread para outro. Usando os métodos Put and Take (que serão bloqueados se forem cheios/vazios).
Aqui está um Lista de classes implementando BlockingQueue
.
Eu recomendaria verificar SynchronousQueue
.
Como @Rorick mencionou em seu comentário, acredito que todas essas implementações são simultâneas. Eu acho que suas preocupações com LinkedBlockingQueue
pode estar fora do lugar.