Kafka(15MB以上)で大きなメッセージを送信するにはどうすればよいですか?

StackOverflow https://stackoverflow.com//questions/21020347

  •  21-12-2019
  •  | 
  •  

質問

文字列メッセージをKafka Vに送信します。Java Producer APIを使用した0.8。メッセージサイズが約15MBの場合、私は次のようになります MessageSizeTooLargeException.私は設定しようとしました message.max.bytes40MBに、私はまだ例外を取得します。小さなメッセージは問題なく機能しました。

(例外はプロデューサーに表示されますが、このアプリケーションにはコンシューマーがありません。)

この例外を取り除くために私は何ができますか?

私のプロデューサー設定の例

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

エラーログ:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
役に立ちましたか?

解決

3(または4つのプロパティ)を調整する必要があります。

  • コンシューマ側:fetch.message.max.bytes - これは、コンシューマによってフェッチできるメッセージの最大サイズを決定します。
  • ブローカー側:replica.fetch.max.bytes - ブローカー内のレプリカがクラスタ内でメッセージを送信し、メッセージが正しく複製されていることを確認することができます。これが小さすぎる場合、メッセージはレプリケートされることはありません。したがって、メッセージはコミットされない(完全に複製されている)ため、消費者はメッセージを表示しません(完全に複製されます)。
  • ブローカー側:message.max.bytes - これはプロデューサーからブローカーによって受信できるメッセージの最大サイズです。
  • ブローカー側(トピックごと):max.message.bytes - これは、ブローカーがトピックに追加できるメッセージの最大サイズです。このサイズは予約されているプリ圧縮を検証します。 (デフォルトはブローカーのmessage.max.bytesです。)

私はNumber 2についての難しい方法を見つけました - カフカからの例外、メッセージ、または警告を受けませんので、大きなメッセージを送信しているときは必ず検討してください。

他のヒント

のために必要なマイナーな変更 カフカ0.10 そして、 新しい消費者 に比べて laughing_manの答え:

  • ブローカー:変更はありませんが、プロパティを増やす必要があります message.max.bytesreplica.fetch.max.bytes. message.max.bytes 等しいか、またはより小さい(*)でなければなりません replica.fetch.max.bytes.
  • プロデューサー:増加 max.request.size より大きなメッセージを送信します。
  • 消費者:増加 max.partition.fetch.bytes より大きなメッセージを受信する。

(*)についての詳細を学ぶためにコメントを読んでください message.max.bytes<=replica.fetch.max.bytes

以下のプロパティを上書きする必要があります。

ブローカー設定($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

コンシューマ設定($ kafka_home / config / consuceer.properties)
このステップは私のためにうまくいきませんでした。私はそれを消費者アプリに追加し、それはうまく機能していました

  • fetch.message.max.bytes

サーバを再起動します。

この資料を探す詳細については、次のように説明しています。 http://fafka.apache.org/08/configuration.html

このアイデアは、同じサイズのメッセージが Kafka プロデューサから Kafka ブローカーに送信され、その後 Kafka Consumer によって受信されるようにすることです。

Kafka プロデューサー --> Kafka ブローカー --> Kafka コンシューマー

要件が 15MB のメッセージを送信することであると仮定します。 プロデューサー, 、 ブローカ そしてその 消費者, 、3 つすべてが同期している必要があります。

カフカプロデューサー 15MBを送信します --> カフカブローカー 15 MB を許可/保存 --> Kafka コンシューマ 15MBを受信します

したがって、設定は次のようにする必要があります。

a) ブローカーについて:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) 消費者に関して:

fetch.message.max.bytes=15728640

それを覚えておくべき一つの重要なこと message.max.bytes 属性が必要です 同期中 消費者の fetch.message.max.bytes プロパティ。フェッチサイズは少なくとも最大メッセージサイズと同じ大きさでなければなりません。そうしないと、プロデューサがコンシューマが消費/フェッチできるよりも大きなメッセージを送信できる状況がある可能性があります。それはそれを見てみる価値があるかもしれません。
どのバージョンのKafkaを使用していますか?また、あなたが得ているいくつかの詳細トレースを提供します。いくつかのようなものがあります。.. payload size of xxxx larger than 1000000 ログに来る?

@ roghing_manからの答えは非常に正確です。それでも、私がカフカエキスパートから学んだ勧告をQuoraから stephane maarek から学んだ勧告を与えたかった。

カフカは大きなメッセージを処理することを意味していません。

あなたのAPIはクラウドストレージ(EX AWS S3)を使用し、S3の参照を参照してKAFKAまたは任意のメッセージブローカーにプッシュするだけです。あなたはあなたのデータを永続的に持続させるためにどこかに見つけられなければなりません、それはネットワークドライブです、それは何でもそれはメッセージブローカーではありません。

上記の解決策に行きたくない場合

メッセージの最大サイズは1MBです(ブローカーの設定はmessage.max.bytesと呼ばれます) Apache Kafka 。あなたが本当にそれがひどく必要とされているならば、あなたはそのサイズを増やし、あなたのプロデューサと消費者のためのネットワークバッファを増やすことができます。

あなたが本当にあなたのメッセージを分割することを気にかけているならば、それぞれのメッセージ分割が同じパーティションにプッシュされるように、あなたの消費者が完全にできるようにあなたのメッセージの内容を報告する必要があることを確認してください。メッセージを再構築してください。

データサイズを小さくすることができるが魔法のように短くなる可能性があるテキストベース(gzip、snappy、lz4圧縮)の場合は、圧縮を調べることもできます。

繰り返しますが、そのデータを保存するために外部システムを使用し、KAFKAへの外部参照を押すだけです。それは非常に一般的なアーキテクチャであり、あなたが一緒に行くべきそして広く受け入れられるべきです。

念頭に置いて、Kafkaは、メッセージが大きくなるがサイズが大きい場合にのみ機能しています。

ソース: https:/ /www.quora.com/how-do-i-send-large-messages-80-mb-in-kafka

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top