Kafka(15MB以上)で大きなメッセージを送信するにはどうすればよいですか?
-
21-12-2019 - |
質問
文字列メッセージをKafka Vに送信します。Java Producer APIを使用した0.8。メッセージサイズが約15MBの場合、私は次のようになります MessageSizeTooLargeException
.私は設定しようとしました message.max.bytes
40MBに、私はまだ例外を取得します。小さなメッセージは問題なく機能しました。
(例外はプロデューサーに表示されますが、このアプリケーションにはコンシューマーがありません。)
この例外を取り除くために私は何ができますか?
私のプロデューサー設定の例
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
エラーログ:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
解決
3(または4つのプロパティ)を調整する必要があります。
- コンシューマ側:
fetch.message.max.bytes
- これは、コンシューマによってフェッチできるメッセージの最大サイズを決定します。 - ブローカー側:
replica.fetch.max.bytes
- ブローカー内のレプリカがクラスタ内でメッセージを送信し、メッセージが正しく複製されていることを確認することができます。これが小さすぎる場合、メッセージはレプリケートされることはありません。したがって、メッセージはコミットされない(完全に複製されている)ため、消費者はメッセージを表示しません(完全に複製されます)。 - ブローカー側:
message.max.bytes
- これはプロデューサーからブローカーによって受信できるメッセージの最大サイズです。 - ブローカー側(トピックごと):
max.message.bytes
- これは、ブローカーがトピックに追加できるメッセージの最大サイズです。このサイズは予約されているプリ圧縮を検証します。 (デフォルトはブローカーのmessage.max.bytes
です。)
私はNumber 2についての難しい方法を見つけました - カフカからの例外、メッセージ、または警告を受けませんので、大きなメッセージを送信しているときは必ず検討してください。
他のヒント
のために必要なマイナーな変更 カフカ0.10 そして、 新しい消費者 に比べて laughing_manの答え:
- ブローカー:変更はありませんが、プロパティを増やす必要があります
message.max.bytes
とreplica.fetch.max.bytes
.message.max.bytes
等しいか、またはより小さい(*)でなければなりませんreplica.fetch.max.bytes
. - プロデューサー:増加
max.request.size
より大きなメッセージを送信します。 - 消費者:増加
max.partition.fetch.bytes
より大きなメッセージを受信する。
(*)についての詳細を学ぶためにコメントを読んでください message.max.bytes
<=replica.fetch.max.bytes
以下のプロパティを上書きする必要があります。
ブローカー設定($ KAFKA_HOME / config / server.properties)
- replica.fetch.max.bytes
- message.max.bytes
コンシューマ設定($ kafka_home / config / consuceer.properties)
このステップは私のためにうまくいきませんでした。私はそれを消費者アプリに追加し、それはうまく機能していました
- fetch.message.max.bytes
サーバを再起動します。
この資料を探す詳細については、次のように説明しています。 http://fafka.apache.org/08/configuration.html
このアイデアは、同じサイズのメッセージが Kafka プロデューサから Kafka ブローカーに送信され、その後 Kafka Consumer によって受信されるようにすることです。
Kafka プロデューサー --> Kafka ブローカー --> Kafka コンシューマー
要件が 15MB のメッセージを送信することであると仮定します。 プロデューサー, 、 ブローカ そしてその 消費者, 、3 つすべてが同期している必要があります。
カフカプロデューサー 15MBを送信します --> カフカブローカー 15 MB を許可/保存 --> Kafka コンシューマ 15MBを受信します
したがって、設定は次のようにする必要があります。
a) ブローカーについて:
message.max.bytes=15728640
replica.fetch.max.bytes=15728640
b) 消費者に関して:
fetch.message.max.bytes=15728640
それを覚えておくべき一つの重要なこと message.max.bytes
属性が必要です 同期中 消費者の fetch.message.max.bytes
プロパティ。フェッチサイズは少なくとも最大メッセージサイズと同じ大きさでなければなりません。そうしないと、プロデューサがコンシューマが消費/フェッチできるよりも大きなメッセージを送信できる状況がある可能性があります。それはそれを見てみる価値があるかもしれません。
どのバージョンのKafkaを使用していますか?また、あなたが得ているいくつかの詳細トレースを提供します。いくつかのようなものがあります。.. payload size of xxxx larger
than 1000000
ログに来る?
@ roghing_manからの答えは非常に正確です。それでも、私がカフカエキスパートから学んだ勧告をQuoraから
カフカは大きなメッセージを処理することを意味していません。
あなたのAPIはクラウドストレージ(EX AWS S3)を使用し、S3の参照を参照してKAFKAまたは任意のメッセージブローカーにプッシュするだけです。あなたはあなたのデータを永続的に持続させるためにどこかに見つけられなければなりません、それはネットワークドライブです、それは何でもそれはメッセージブローカーではありません。
上記の解決策に行きたくない場合
メッセージの最大サイズは1MBです(ブローカーの設定はmessage.max.bytes
と呼ばれます) Apache Kafka 。あなたが本当にそれがひどく必要とされているならば、あなたはそのサイズを増やし、あなたのプロデューサと消費者のためのネットワークバッファを増やすことができます。
データサイズを小さくすることができるが魔法のように短くなる可能性があるテキストベース(gzip、snappy、lz4圧縮)の場合は、圧縮を調べることもできます。
繰り返しますが、そのデータを保存するために外部システムを使用し、KAFKAへの外部参照を押すだけです。それは非常に一般的なアーキテクチャであり、あなたが一緒に行くべきそして広く受け入れられるべきです。
念頭に置いて、Kafkaは、メッセージが大きくなるがサイズが大きい場合にのみ機能しています。
ソース: https:/ /www.quora.com/how-do-i-send-large-messages-80-mb-in-kafka