Pergunta

Eu enviar Cadeia de mensagens para Kafka V.0.8 com o Java Produtor API.Se o tamanho da mensagem é de cerca de 15 MB, eu recebo um MessageSizeTooLargeException.Eu tentei definir message.max.bytes40 MB, mas eu ainda recebo a exceção.Pequenas mensagens funcionou sem problemas.

(A exceção aparecer no produtor, não um consumidor neste aplicativo.)

O que posso fazer para livrar-se da presente exceção?

Meu exemplo de produtores de configuração

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Log De Erros:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Foi útil?

Solução

Você precisa ajustar três (ou quatro) propriedades:

  • Consumidor:fetch.message.max.bytes - isso vai determinar o tamanho máximo de uma mensagem que pode ser obtida pelo consumidor.
  • Corretor lado: replica.fetch.max.bytes - isso vai permitir que as réplicas no corretores para enviar mensagens dentro do cluster e certifique-se de que as mensagens são replicadas corretamente.Se esta for muito pequena, em seguida, a mensagem nunca será replicado, e, portanto, o consumidor nunca vai ver a mensagem, pois a mensagem nunca será confirmada (totalmente replicado).
  • Corretor lado: message.max.bytes - este é o maior tamanho de mensagem que pode ser recebida pelo corretor de um produtor.
  • Corretor de lado (por tema): max.message.bytes - este é o maior tamanho da mensagem, o corretor irá permitir a ser anexado ao tópico.Este tamanho é validado de pré-compressão.(A predefinição do corretor message.max.bytes.)

Eu descobri da maneira mais difícil sobre número 2 - você não receba exceções, mensagens, avisos ou de Kafka, por isso certifique-se de levar isso em consideração quando estiver a enviar mensagens grandes.

Outras dicas

Pequenas alterações necessárias para Kafka 0.10 e o novo consumidor comparado com laughing_man resposta:

  • Broker:Sem alterações, você ainda precisa para aumentar as propriedades message.max.bytes e replica.fetch.max.bytes. message.max.bytes tem que ser igual ou menor(*) que replica.fetch.max.bytes.
  • Produtor:Aumentar max.request.size para enviar a mensagem maior.
  • Consumidor:Aumentar max.partition.fetch.bytes para receber mensagens maiores.

(*) Leia os comentários, para saber mais sobre message.max.bytes<=replica.fetch.max.bytes

Você precisa substituir as seguintes propriedades:

corretor configs ($ kafka_home / config / server.properties)

  • réplica.fetch.max.bytes
  • message.max.bytes

CONFIGS ($ KAFKA_HOME / CONFIG / Consumer.properties)
Esta etapa não funcionou para mim.Eu adiciono ao aplicativo do consumidor e estava funcionando bem

  • fetch.message.max.bytes

Reinicie o servidor.

Olhe para esta documentação para mais informações: http://kafka.apache.org/08/configuration.html

A ideia é ter o mesmo tamanho da mensagem a ser enviada a partir de Kafka Produtor de Kafka Corretor e, em seguida, recebido por Kafka Consumidor i.e.

Kafka produtor --> Kafka Corretor --> Kafka Consumidor

Suponha-se o requisito é enviar 15MB de mensagem e, em seguida, o Produtor, o Corretor e o Consumidor, todos os três, precisa estar em sincronia.

Kafka Produtor envia 15 MB --> Kafka Corretor Permite que/Lojas 15 MB --> Kafka Consumidor recebe 15 MB

A definição, portanto, deve ser:

a) Corretor:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) Consumidor:

fetch.message.max.bytes=15728640

Uma coisa importante a lembrar que message.max.bytes atributo deve ser em sincronia com o consumidor fetch.message.max.bytes propriedade.o tamanho da busca deve ser pelo menos tão grande quanto o tamanho máximo de mensagem de outra forma não poderia haver situação em que os produtores podem enviar mensagens maiores do que o consumidor pode consumir/busca.Talvez vale a pena dar uma olhada nele.
Qual a versão do Kafka você está usando?Também disponibilizamos mais alguns detalhes de rastreamento que você está recebendo.existe alguma coisa como ... payload size of xxxx larger than 1000000 chegando no log?

A resposta do @laughing_man é bastante precisa.Mas ainda assim, eu queria dar uma recomendação que eu aprendi a partir de Kafka especialista Stephane Maarek a partir do Quora.

Kafka não pretende processar mensagens grandes.

Sua API deve usar armazenamento em nuvem (Ex AWS S3), e apenas empurrar para Kafka ou qualquer message broker uma referência de S3.Você deve encontrar um lugar para persistir seus dados, talvez seja uma unidade de rede, talvez seja o que for, mas ele não deve ser message broker.

Agora, se você não quer ir com a solução acima

A mensagem de tamanho máximo é de 1 mb (a configuração no seu corretores é chamado de message.max.bytes) Apache Kafka.Se você realmente precisava mal, você pode aumentar o tamanho e certifique-se de aumentar os buffers de rede para os seus produtores e consumidores.

E se você realmente se preocupam com a divisão de sua mensagem, certifique-se de que cada mensagem dividida tem a mesma tecla de forma que ela é empurrado para a mesma partição, e o conteúdo da mensagem deve relatar uma parte "id" para que o consumidor possa reconstruir totalmente a mensagem.

Você também pode explorar compressão, se a sua mensagem é baseado em texto (gzip, mal-humorado, lz4 de compressão), o que pode reduzir o tamanho dos dados, mas não magicamente.

Novamente, você tem que usar um sistema externo para armazenar os dados e apenas empurrar uma referência externa para Kafka.Que é muito mais comum do que a arquitetura, e que você deve ir com e amplamente aceito.

Tenha isso em mente, Kafka funciona melhor se as mensagens são enormes em quantidade, mas não em tamanho.

Fonte: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top