Como posso enviar mensagens de grandes dimensões com Kafka (mais de 15MB)?
-
21-12-2019 - |
Pergunta
Eu enviar Cadeia de mensagens para Kafka V.0.8 com o Java Produtor API.Se o tamanho da mensagem é de cerca de 15 MB, eu recebo um MessageSizeTooLargeException
.Eu tentei definir message.max.bytes
40 MB, mas eu ainda recebo a exceção.Pequenas mensagens funcionou sem problemas.
(A exceção aparecer no produtor, não um consumidor neste aplicativo.)
O que posso fazer para livrar-se da presente exceção?
Meu exemplo de produtores de configuração
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Log De Erros:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Solução
Você precisa ajustar três (ou quatro) propriedades:
- Consumidor:
fetch.message.max.bytes
- isso vai determinar o tamanho máximo de uma mensagem que pode ser obtida pelo consumidor. - Corretor lado:
replica.fetch.max.bytes
- isso vai permitir que as réplicas no corretores para enviar mensagens dentro do cluster e certifique-se de que as mensagens são replicadas corretamente.Se esta for muito pequena, em seguida, a mensagem nunca será replicado, e, portanto, o consumidor nunca vai ver a mensagem, pois a mensagem nunca será confirmada (totalmente replicado). - Corretor lado:
message.max.bytes
- este é o maior tamanho de mensagem que pode ser recebida pelo corretor de um produtor. - Corretor de lado (por tema):
max.message.bytes
- este é o maior tamanho da mensagem, o corretor irá permitir a ser anexado ao tópico.Este tamanho é validado de pré-compressão.(A predefinição do corretormessage.max.bytes
.)
Eu descobri da maneira mais difícil sobre número 2 - você não receba exceções, mensagens, avisos ou de Kafka, por isso certifique-se de levar isso em consideração quando estiver a enviar mensagens grandes.
Outras dicas
Pequenas alterações necessárias para Kafka 0.10 e o novo consumidor comparado com laughing_man resposta:
- Broker:Sem alterações, você ainda precisa para aumentar as propriedades
message.max.bytes
ereplica.fetch.max.bytes
.message.max.bytes
tem que ser igual ou menor(*) quereplica.fetch.max.bytes
. - Produtor:Aumentar
max.request.size
para enviar a mensagem maior. - Consumidor:Aumentar
max.partition.fetch.bytes
para receber mensagens maiores.
(*) Leia os comentários, para saber mais sobre message.max.bytes
<=replica.fetch.max.bytes
Você precisa substituir as seguintes propriedades:
corretor configs ($ kafka_home / config / server.properties)
- réplica.fetch.max.bytes
- message.max.bytes
CONFIGS ($ KAFKA_HOME / CONFIG / Consumer.properties)
Esta etapa não funcionou para mim.Eu adiciono ao aplicativo do consumidor e estava funcionando bem
- fetch.message.max.bytes
Reinicie o servidor.
Olhe para esta documentação para mais informações: http://kafka.apache.org/08/configuration.html
A ideia é ter o mesmo tamanho da mensagem a ser enviada a partir de Kafka Produtor de Kafka Corretor e, em seguida, recebido por Kafka Consumidor i.e.
Kafka produtor --> Kafka Corretor --> Kafka Consumidor
Suponha-se o requisito é enviar 15MB de mensagem e, em seguida, o Produtor, o Corretor e o Consumidor, todos os três, precisa estar em sincronia.
Kafka Produtor envia 15 MB --> Kafka Corretor Permite que/Lojas 15 MB --> Kafka Consumidor recebe 15 MB
A definição, portanto, deve ser:
a) Corretor:
message.max.bytes=15728640
replica.fetch.max.bytes=15728640
b) Consumidor:
fetch.message.max.bytes=15728640
Uma coisa importante a lembrar que message.max.bytes
atributo deve ser em sincronia com o consumidor fetch.message.max.bytes
propriedade.o tamanho da busca deve ser pelo menos tão grande quanto o tamanho máximo de mensagem de outra forma não poderia haver situação em que os produtores podem enviar mensagens maiores do que o consumidor pode consumir/busca.Talvez vale a pena dar uma olhada nele.
Qual a versão do Kafka você está usando?Também disponibilizamos mais alguns detalhes de rastreamento que você está recebendo.existe alguma coisa como ... payload size of xxxx larger
than 1000000
chegando no log?
A resposta do @laughing_man é bastante precisa.Mas ainda assim, eu queria dar uma recomendação que eu aprendi a partir de Kafka especialista Stephane Maarek a partir do Quora.
Kafka não pretende processar mensagens grandes.
Sua API deve usar armazenamento em nuvem (Ex AWS S3), e apenas empurrar para Kafka ou qualquer message broker uma referência de S3.Você deve encontrar um lugar para persistir seus dados, talvez seja uma unidade de rede, talvez seja o que for, mas ele não deve ser message broker.
Agora, se você não quer ir com a solução acima
A mensagem de tamanho máximo é de 1 mb (a configuração no seu corretores é chamado de message.max.bytes
) Apache Kafka.Se você realmente precisava mal, você pode aumentar o tamanho e certifique-se de aumentar os buffers de rede para os seus produtores e consumidores.
E se você realmente se preocupam com a divisão de sua mensagem, certifique-se de que cada mensagem dividida tem a mesma tecla de forma que ela é empurrado para a mesma partição, e o conteúdo da mensagem deve relatar uma parte "id" para que o consumidor possa reconstruir totalmente a mensagem.
Você também pode explorar compressão, se a sua mensagem é baseado em texto (gzip, mal-humorado, lz4 de compressão), o que pode reduzir o tamanho dos dados, mas não magicamente.
Novamente, você tem que usar um sistema externo para armazenar os dados e apenas empurrar uma referência externa para Kafka.Que é muito mais comum do que a arquitetura, e que você deve ir com e amplamente aceito.
Tenha isso em mente, Kafka funciona melhor se as mensagens são enormes em quantidade, mas não em tamanho.
Fonte: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka