Question

J'envoie la Chaîne de messages à Kafka V.0.8 Java avec le Producteur de l'API.Si la taille du message est d'environ 15 MO-je obtenir un MessageSizeTooLargeException.J'ai essayé de mettre message.max.bytesà 40 MO, mais j'ai toujours l'exception.Des petits messages a fonctionné sans problèmes.

(À l'exception d'apparaître dans le producteur, je n'ai pas de consommation dans la présente demande.)

Que puis-je faire pour me débarrasser de cette exception?

Mon exemple producteur config

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Erreur De Fichier Journal:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Était-ce utile?

La solution

Vous avez besoin d'ajuster les trois (ou quatre) propriétés:

  • Secondaires du consommateur:fetch.message.max.bytes - cela permettra de déterminer la taille maximale d'un message qui peut être récupérée par le consommateur.
  • Courtier côté: replica.fetch.max.bytes pour les répliques dans les courtiers pour envoyer des messages au sein du cluster et de s'assurer que les messages sont correctement reproduites.Si c'est trop petit, le message ne sera jamais répliqué, et par conséquent, le consommateur ne verra jamais le message parce que le message ne sera jamais commis (à reproduire).
  • Courtier côté: message.max.bytes - c'est la plus grande taille du message, qui peut être reçu par le courtier auprès d'un producteur.
  • Courtier côté (par sujet): max.message.bytes - c'est la plus grande taille du message, le courtier va permettre d'être ajouté à la rubrique.Cette taille est validée pré-compression.(Valeur par défaut du courtier message.max.bytes.)

J'ai découvert la manière dure environ 2 - vous ne pas faire des exceptions, des messages, ou des avertissements de Kafka, donc soyez sûr d'envisager cette option lorsque vous envoyez des messages volumineux.

Autres conseils

Des modifications mineures nécessaires pour Kafka 0.10 et la de nouveaux consommateurs par rapport à laughing_man réponse:

  • Courtier:Aucun changement, vous avez encore le besoin d'augmenter les propriétés message.max.bytes et replica.fetch.max.bytes. message.max.bytes doit être égale ou plus petite(*) que replica.fetch.max.bytes.
  • Producteur:Augmentation max.request.size pour envoyer le message supérieure.
  • Consommateur:Augmentation max.partition.fetch.bytes pour recevoir les messages les plus grands.

(*) Lire les commentaires pour en savoir plus sur message.max.bytes<=replica.fetch.max.bytes

Vous avez besoin de redéfinir les propriétés suivantes:

Courtier Configs($KAFKA_HOME/config/serveur.les propriétés)

  • la réplique.fetch.max.octets
  • message.max.octets

La consommation Configs($KAFKA_HOME/config/consommateur.les propriétés)
Cette étape ne fonctionne pas pour moi.Je l'ajoute à la consommation app et ça marchait très bien

  • fetch.message.max.octets

Redémarrez le serveur.

regardez cette documentation pour plus d'info:http://kafka.apache.org/08/configuration.html

L'idée est de taille égale de message envoyé à partir de Kafka Producteur de Kafka Courtier et a ensuite reçu par Kafka à la Consommation c'est à dire

Kafka producteur --> Kafka Courtier --> Kafka Consommateurs

Supposons que si le besoin est pour envoyer de 15 mo de message, puis la Producteur, le Courtier et la Consommateurs, tous les trois, doit être en harmonie.

Kafka Producteur envoie 15 MO --> Kafka Courtier Permet de Magasins et 15 MO --> Kafka Consommateurs reçoit 15 MO

Le paramètre doit donc être:

a) sur Courtier:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) sur la Consommation:

fetch.message.max.bytes=15728640

Un truc important à retenir que message.max.bytes attribut doit être la synchronisation le consommateur fetch.message.max.bytes de la propriété.la taille de l'extraction doit être au moins aussi grand que la taille maximale de message sinon il pourrait y avoir une situation où les producteurs peuvent envoyer des messages plus longs que le consommateur peut consommer/fetch.Il pourrait la peine de prendre un coup d'oeil.
La version de Kafka que vous utilisez?Également fournir plus de détails trace de qui vous obtenez.est-il quelque chose comme ... payload size of xxxx larger than 1000000 à venir dans le journal?

La réponse de @laughing_man est tout à fait exacte.Mais encore, j'ai voulu donner une recommandation que j'ai appris de Kafka expert Stéphane Maarek de Quora.

Kafka n'est pas destiné à gérer les messages volumineux.

Votre API doit utiliser le cloud de stockage (Ex AWS S3), et il suffit de pousser à Kafka ou de tout message broker une référence de S3.Vous devez trouver un endroit pour persister vos données, c'est peut-être un lecteur réseau, c'est peut-être que ce soit, mais il ne devrait pas être courtier de message.

Maintenant, si vous ne voulez pas aller avec la solution ci-dessus

Le message que la taille maximale est de 1 mo (le paramètre de votre courtier est appelé message.max.bytes) Apache Kafka.Si vous avez vraiment nécessaire, vous pouvez augmenter cette taille et assurez-vous d'augmenter le réseau de zones tampons pour votre les producteurs et les consommateurs.

Et si vous vous souciez vraiment de la division de votre message, assurez-vous que chaque message split a exactement la même clé, de sorte qu'il est poussé à la même partition, et le contenu de votre message doit faire état d'un “id de partie”, de sorte que votre consommateur peut reconstruire entièrement le message.

Vous pouvez aussi explorer la compression, si votre message est basé sur du texte (gzip, snappy, compression lz4), ce qui peut réduire la taille des données, mais pas par magie.

Encore une fois, vous devez utiliser un système externe pour stocker des données et il suffit de pousser une référence externe à Kafka.C'est un très commun de l'architecture, et vous devez aller avec et largement acceptée.

Gardez cela à l'esprit Kafka fonctionne mieux si les messages sont énormes dans la quantité, mais pas en taille.

Source: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top