Frage

Ich sende String-Nachrichten an Kafka V.0.8 mit der Java Producer API.Wenn die Nachrichtengröße ungefähr 15 MB beträgt, erhalte ich eine MessageSizeTooLargeException.Ich habe versucht zu setzen message.max.bytesbis 40 MB, aber ich bekomme immer noch die Ausnahme.Kleine Nachrichten funktionierten ohne Probleme.

(Die Ausnahme erscheint im Hersteller, ich habe keinen Verbraucher in dieser Anwendung.)

Was kann ich tun, um diese Ausnahme zu beseitigen?

Meine Beispielproduzentenkonfiguration

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Fehlerprotokoll:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
War es hilfreich?

Lösung

Sie müssen drei (oder vier) Eigenschaften anpassen:

  • Verbraucherseite:fetch.message.max.bytes - dies bestimmt die größte Größe einer Nachricht, die vom Verbraucher abgerufen werden kann.
  • Maklerseite: replica.fetch.max.bytes - dadurch können die Replikate in den Brokern Nachrichten innerhalb des Clusters senden und sicherstellen, dass die Nachrichten korrekt repliziert werden.Wenn dies zu klein ist, wird die Nachricht niemals repliziert, und daher wird der Verbraucher die Nachricht niemals sehen, da die Nachricht niemals festgeschrieben (vollständig repliziert) wird.
  • Maklerseite: message.max.bytes - dies ist die größte Größe der Nachricht, die der Broker von einem Produzenten empfangen kann.
  • Brokerseite (pro Thema): max.message.bytes - dies ist die größte Größe der Nachricht, die der Broker an das Thema anhängen darf.Diese Größe ist eine validierte Vorkomprimierung.(Standardmäßig Broker message.max.bytes.)

Ich habe Nummer 2 auf die harte Tour herausgefunden - Sie erhalten keine Ausnahmen, Nachrichten oder Warnungen von Kafka, also denken Sie daran, dies zu berücksichtigen, wenn Sie große Nachrichten senden.

Andere Tipps

Geringfügige Änderungen erforderlich für Kafka 0.10 und die neuer Verbraucher im Vergleich zu antwort von laughing_man:

  • Broker:Keine Änderungen, Sie müssen die Eigenschaften noch erhöhen message.max.bytes und replica.fetch.max.bytes. message.max.bytes muss gleich oder kleiner(*) sein als replica.fetch.max.bytes.
  • Produzent:Erhöhen max.request.size um die größere Nachricht zu senden.
  • Verbraucher:Erhöhen max.partition.fetch.bytes um größere Nachrichten zu empfangen.

(*) Lesen Sie die Kommentare, um mehr zu erfahren über message.max.bytes<=replica.fetch.max.bytes

Sie müssen die folgenden Eigenschaften überschreiben:

broker configs ($ kafka_home / config / server.properties)

    .
  • replica.fetch.max.bytes
  • message.max.bytes

consumer configs ($ kafka_home / config / consumer.properties)
Dieser Schritt funktionierte nicht für mich.Ich füge es der Consumer-App hinzu und es funktionierte gut

    .
  • fech.message.max.bytes

Starten Sie den Server neu.

Schauen Sie sich diese Dokumentation für weitere Informationen an: http://kafka.apache.org/08/configuration.html

Die Idee ist, eine gleich große Nachricht zu haben, die vom Kafka-Produzenten an den Kafka-Broker gesendet und anschließend von Kafka Consumer I.e erhalten wird.

kafka produzent -> kafka broker -> kafka consumer

Angenommen, wenn die Anforderung darin besteht, 15 MB Nachricht zu senden, dann der -Produzent , der -makler , der consumer , alle drei, müssensynchron sein.

kafka produzent sendet 15 MB -> kafka broker Erlaubt / speichert 15 MB -> Kafka Consumer erhält 15 MB

Die Einstellung sollte daher sein:

a) beim Broker:

generasacodicetagpre.

b) auf den Verbraucher:

generasacodicetagpre.

Eine wichtige Sache, an die man sich erinnern sollte message.max.bytes attribut muss sein synchron mit dem Verbraucher fetch.message.max.bytes Eigenschaft.die Abrufgröße muss mindestens so groß sein wie die maximale Nachrichtengröße, andernfalls kann es vorkommen, dass Produzenten Nachrichten senden können, die größer sind, als der Konsument konsumieren / abrufen kann.Es könnte sich lohnen, einen Blick darauf zu werfen.
Welche Version von Kafka verwenden Sie?Geben Sie auch einige weitere Details an, die Sie erhalten.gibt es so etwas wie ... payload size of xxxx larger than 1000000 kommt im Protokoll nach oben?

Die Antwort von @Laughing_Man ist ziemlich genau. Aber trotzdem wollte ich eine Empfehlung geben, die ich aus dem Kafka-Experten gelernt habe Stephane Maarek von Quora.

kafka ist nicht gedacht, um große Nachrichten zu behandeln.

Ihre API sollte den Cloud-Speicher (EX AWS S3) verwenden, und drücken Sie einfach in Kafka oder einem beliebigen Nachrichten-Broker eine Referenz von S3. Sie müssen irgendwo finden, um Ihre Daten anzutreten, vielleicht ist es ein Netzlaufwerk, vielleicht ist es unabhängig, aber es sollte kein Message Broker sein.

Jetzt, wenn Sie nicht mit der obigen Lösung gehen möchten

Die Meldung Max-Größe beträgt 1 MB (die Einstellung in Ihren Brokern wird genannt genannt asg a>. Wenn Sie es wirklich schlecht gebraucht haben, können Sie diese Größe erhöhen und die Netzwerkpuffer für Ihre Hersteller und Verbraucher erhöhen.

und wenn Sie sich wirklich darum kümmern, Ihre Nachricht aufzuteilen, stellen Sie sicher, dass jede Meldungsspaltung den gleichen Schlüssel hat, sodass er auf dieselbe Partition gedrückt wird, und Ihre Nachrichteninhalte sollten eine "Teil-ID" melden, damit Ihr Verbraucher vollständig ist rekonstruieren Sie die Nachricht.

Sie können auch die Komprimierung erforschen, wenn Ihre Nachricht textbasiert ist (GZIP, SNAPY, LZ4-Komprimierung), die die Datengröße reduzieren kann, jedoch nicht magisch.

Wieder müssen Sie ein externes System verwenden, um diese Daten zu speichern, und drücken Sie einfach einen externen Hinweis auf Kafka. Das ist eine sehr häufige Architektur, und eines, mit dem Sie gehen und weit akzeptieren sollten.

halten Sie das in Sinn, dass Kafka am besten funktioniert, wenn die Botschaften in der Menge riesig sind, aber nicht in der Größe.

Quelle: https: / /www.quora.com/how-do-i-send-Large-Messages-80-mb-in-kafka

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top