Domanda

Invio messaggi di stringa a Kafka V. 0.8 con l'API del produttore Java. Se la dimensione del messaggio è di circa 15 MB ottengo un MessageSizeTooLargeException. Ho provato a impostare message.max.bytesto 40 MB, ma ho ancora l'eccezione.I piccoli messaggi hanno funzionato senza problemi.

(L'eccezione appare nel produttore, non ho un consumatore in questa applicazione.)

Cosa posso fare per sbarazzarsi di questa eccezione?

My ESEMPIO PRODUTER CONFIG

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}
.

errori-log:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
.
È stato utile?

Soluzione

È necessario regolare tre (o quattro) proprietà:

    .
  • Lato dei consumatori: fetch.message.max.bytes: determinerà la dimensione più grande di un messaggio che può essere recuperato dal consumatore.
  • lato broker: replica.fetch.max.bytes: consentirà che le repliche nei broker inviare messaggi all'interno del cluster e accertarsi che i messaggi siano replicati correttamente. Se questo è troppo piccolo, il messaggio non verrà mai replicato, e pertanto, il consumatore non vedrà mai il messaggio perché il messaggio non verrà mai commesso (completamente replicato).
  • lato broker: message.max.bytes: questa è la dimensione più grande del messaggio che può essere ricevuto dal broker da un produttore.
  • lato broker (per argomento): max.message.bytes: questa è la dimensione più grande del messaggio che il broker consentirà di aggiungere all'argomento. Questa dimensione è la pre-compressione convalidata. (Impostazioni predefinite al message.max.bytes del broker.)

Ho scoperto il modo più duro del numero 2 - non ottieni eccezioni, messaggi o avvertimenti da Kafka, quindi assicurati di considerarlo quando si invia messaggi di grandi dimensioni.

Altri suggerimenti

Minori modifiche richieste per Kafka 0.10 e nuovo consumatore Rispetto a Risposta di riding_man :

    .
  • Broker: nessuna modifica, è comunque necessario aumentare le proprietà message.max.bytes e replica.fetch.max.bytes.message.max.bytes deve essere uguale o più piccolo (*) rispetto a replica.fetch.max.bytes.
  • Produttore: Aumenta max.request.size per inviare il messaggio più grande.
  • Consumatore: Aumentare max.partition.fetch.bytes per ricevere messaggi più grandi.

(*) Leggi i commenti per saperne di più su message.max.bytes <= replica.fetch.max.bytes

È necessario scavalcare le seguenti proprietà:

Broker Configs ($ kafka_home / config / server.properties)

    .
  • replica.fetch.max.bytes
  • messaggio.max.bytes

Configura dei consumatori ($ kafka_home / config / consumer.properties)
Questo passo non ha funzionato per me.Lo aggiungo all'app dei consumatori e funzionava bene

    .
  • fetch.message.max.bytes

Riavvia il server.

Guarda questa documentazione per maggiori informazioni: http://kafka.apache.org/08/configuration.html

L'idea è di avere uguali dimensioni del messaggio inviato dal produttore di Kafka al broker di Kafka e poi ricevuto da Kafka Consumer I.e.

Kafka Producer -> Kafka Broker -> Kafka Consumer

Supponiamo se il requisito è di inviare 15 MB di messaggio, il produttore , broker e consumer , tutti e tre, ha bisognoessere sincronizzato

Kafka Producer Invia 15 MB -> Kafka Broker Consente / Memorizza 15 MB -> Kafka Consumer riceve 15 MB

L'impostazione quindi dovrebbe essere:

a) sul broker:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640
.

b) sul consumatore:

fetch.message.max.bytes=15728640
.

Una cosa chiave da ricordare che l'attributo generacodictagcode deve essere in sincronia con la proprietà message.max.bytes del consumatore.La dimensione del recupero deve essere almeno grande quanto la dimensione massima del messaggio altrimenti potrebbe esserci situazione in cui i produttori possono inviare messaggi più grandi del consumatore può consumare / recuperare.Potrebbe valere la pena dargli un'occhiata.
. Quale versione di Kafka stai usando?Fornisci anche alcuni dettagli traccia che stai ottenendo.C'è qualcosa come ... fetch.message.max.bytes in arrivo nel registro?

La risposta da @laughing_man è abbastanza accurata. Ma ancora, volevo dare una raccomandazione che ho imparato da Kafka Expert Stephane Maarek da Quora.

Kafka non è destinato a gestire messaggi di grandi dimensioni.

La tua API dovrebbe utilizzare lo stoccaggio cloud (ex AWS S3) e semplicemente premere a KAFKA o qualsiasi messaggio Broker un riferimento di S3. Devi trovare un posto dove persistere i tuoi dati, forse è un'unità di rete, forse è tutto ciò, ma non dovrebbe essere un broker di messaggi.

Ora, se non vuoi andare con la soluzione sopra

Il messaggio Dimensione massima è 1 MB (l'impostazione dei tuoi broker è chiamata message.max.bytes) Apache Kafka . Se ne avessi davvero bisogno, è possibile aumentare quella dimensione e assicurarti di aumentare i buffer di rete per i tuoi produttori e consumatori.

E se ti interessa davvero dividere il tuo messaggio, assicurati che ogni messaggio diviso abbia lo stesso esatto tasto in modo che venga spinto alla stessa partizione e il tuo contenuto del messaggio dovrebbe segnalare un "ID parte" in modo che il consumatore possa completamente ricostruire il messaggio.

È inoltre possibile esplorare la compressione, se il messaggio è basato su testo (Gzip, Snappy, LZ4 compression) che può ridurre la dimensione dei dati, ma non magicamente.

Ancora una volta, è necessario utilizzare un sistema esterno per memorizzare tali dati e semplicemente spingere un riferimento esterno a Kafka. Questa è un'architettura molto comune, e uno dovresti andare con e ampiamente accettato.

Tienilo a mente Kafka funziona meglio solo se i messaggi sono enormi in quantità ma non di dimensioni.

Fonte: https: / /www.quora.com/how-do--Send-Large-Messaggi-80-MB-in-Kafka

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top