Comment puis-je envoyer des messages volumineux avec Kafka (plus de 15 MO)?
-
21-12-2019 - |
Question
J'envoie la Chaîne de messages à Kafka V.0.8 Java avec le Producteur de l'API.Si la taille du message est d'environ 15 MO-je obtenir un MessageSizeTooLargeException
.J'ai essayé de mettre message.max.bytes
à 40 MO, mais j'ai toujours l'exception.Des petits messages a fonctionné sans problèmes.
(À l'exception d'apparaître dans le producteur, je n'ai pas de consommation dans la présente demande.)
Que puis-je faire pour me débarrasser de cette exception?
Mon exemple producteur config
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Erreur De Fichier Journal:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
La solution
Vous avez besoin d'ajuster les trois (ou quatre) propriétés:
- Secondaires du consommateur:
fetch.message.max.bytes
- cela permettra de déterminer la taille maximale d'un message qui peut être récupérée par le consommateur. - Courtier côté:
replica.fetch.max.bytes
pour les répliques dans les courtiers pour envoyer des messages au sein du cluster et de s'assurer que les messages sont correctement reproduites.Si c'est trop petit, le message ne sera jamais répliqué, et par conséquent, le consommateur ne verra jamais le message parce que le message ne sera jamais commis (à reproduire). - Courtier côté:
message.max.bytes
- c'est la plus grande taille du message, qui peut être reçu par le courtier auprès d'un producteur. - Courtier côté (par sujet):
max.message.bytes
- c'est la plus grande taille du message, le courtier va permettre d'être ajouté à la rubrique.Cette taille est validée pré-compression.(Valeur par défaut du courtiermessage.max.bytes
.)
J'ai découvert la manière dure environ 2 - vous ne pas faire des exceptions, des messages, ou des avertissements de Kafka, donc soyez sûr d'envisager cette option lorsque vous envoyez des messages volumineux.
Autres conseils
Des modifications mineures nécessaires pour Kafka 0.10 et la de nouveaux consommateurs par rapport à laughing_man réponse:
- Courtier:Aucun changement, vous avez encore le besoin d'augmenter les propriétés
message.max.bytes
etreplica.fetch.max.bytes
.message.max.bytes
doit être égale ou plus petite(*) quereplica.fetch.max.bytes
. - Producteur:Augmentation
max.request.size
pour envoyer le message supérieure. - Consommateur:Augmentation
max.partition.fetch.bytes
pour recevoir les messages les plus grands.
(*) Lire les commentaires pour en savoir plus sur message.max.bytes
<=replica.fetch.max.bytes
Vous avez besoin de redéfinir les propriétés suivantes:
Courtier Configs($KAFKA_HOME/config/serveur.les propriétés)
- la réplique.fetch.max.octets
- message.max.octets
La consommation Configs($KAFKA_HOME/config/consommateur.les propriétés)
Cette étape ne fonctionne pas pour moi.Je l'ajoute à la consommation app et ça marchait très bien
- fetch.message.max.octets
Redémarrez le serveur.
regardez cette documentation pour plus d'info:http://kafka.apache.org/08/configuration.html
L'idée est de taille égale de message envoyé à partir de Kafka Producteur de Kafka Courtier et a ensuite reçu par Kafka à la Consommation c'est à dire
Kafka producteur --> Kafka Courtier --> Kafka Consommateurs
Supposons que si le besoin est pour envoyer de 15 mo de message, puis la Producteur, le Courtier et la Consommateurs, tous les trois, doit être en harmonie.
Kafka Producteur envoie 15 MO --> Kafka Courtier Permet de Magasins et 15 MO --> Kafka Consommateurs reçoit 15 MO
Le paramètre doit donc être:
a) sur Courtier:
message.max.bytes=15728640
replica.fetch.max.bytes=15728640
b) sur la Consommation:
fetch.message.max.bytes=15728640
Un truc important à retenir que message.max.bytes
attribut doit être la synchronisation le consommateur fetch.message.max.bytes
de la propriété.la taille de l'extraction doit être au moins aussi grand que la taille maximale de message sinon il pourrait y avoir une situation où les producteurs peuvent envoyer des messages plus longs que le consommateur peut consommer/fetch.Il pourrait la peine de prendre un coup d'oeil.
La version de Kafka que vous utilisez?Également fournir plus de détails trace de qui vous obtenez.est-il quelque chose comme ... payload size of xxxx larger
than 1000000
à venir dans le journal?
La réponse de @laughing_man est tout à fait exacte.Mais encore, j'ai voulu donner une recommandation que j'ai appris de Kafka expert Stéphane Maarek de Quora.
Kafka n'est pas destiné à gérer les messages volumineux.
Votre API doit utiliser le cloud de stockage (Ex AWS S3), et il suffit de pousser à Kafka ou de tout message broker une référence de S3.Vous devez trouver un endroit pour persister vos données, c'est peut-être un lecteur réseau, c'est peut-être que ce soit, mais il ne devrait pas être courtier de message.
Maintenant, si vous ne voulez pas aller avec la solution ci-dessus
Le message que la taille maximale est de 1 mo (le paramètre de votre courtier est appelé message.max.bytes
) Apache Kafka.Si vous avez vraiment nécessaire, vous pouvez augmenter cette taille et assurez-vous d'augmenter le réseau de zones tampons pour votre les producteurs et les consommateurs.
Et si vous vous souciez vraiment de la division de votre message, assurez-vous que chaque message split a exactement la même clé, de sorte qu'il est poussé à la même partition, et le contenu de votre message doit faire état d'un “id de partie”, de sorte que votre consommateur peut reconstruire entièrement le message.
Vous pouvez aussi explorer la compression, si votre message est basé sur du texte (gzip, snappy, compression lz4), ce qui peut réduire la taille des données, mais pas par magie.
Encore une fois, vous devez utiliser un système externe pour stocker des données et il suffit de pousser une référence externe à Kafka.C'est un très commun de l'architecture, et vous devez aller avec et largement acceptée.
Gardez cela à l'esprit Kafka fonctionne mieux si les messages sont énormes dans la quantité, mais pas en taille.
Source: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka