質問

Kafkaには非常に新しい、Kafka 0.8.1を使用しています。

私がする必要があるのは、トピックからメッセージを消費することです。そのために、私はJavaで1つの消費者を書く必要があります。これは、トピックからメッセージを消費してからそのメッセージをデータベースに保存する必要があります。メッセージが保存されたら、確認応答がJava Consumerに送信されます。確認応答が真の場合、次のメッセージはトピックから消費されるべきです。 acknowldgementがfalseの場合(一部のエラーメッセージがあるため、トピックから読み取り、データベースに保存できませんでした)、そのメッセージを読み込む必要があります。

Simple Consumerを使用して、メッセージオフセットを制御し、このリンクに指定されている単純な消費者の例を制御し、このリンク="https://cwiki.apache.org/confluence/display/kafkaを使用する必要があると思います。 /0.8.0以降のisimpleConsumer+example "rel=" nofollow "> https://cwiki.apache.org/confluence/display/kafka/0.8.0 + simpleConsumer+example 。

この例では、offsetはRunメソッドで 'readOffset'として評価されます。私はそれと遊ぶ必要がありますか?例えば。 LatestTime()の代わりにEarliestTime()を使用することができ、FALSEの場合は、offset - 1を使用する前にオフセットを1にリセットします。

これはどのように進むべきですか?

役に立ちましたか?

解決

高水準消費者( http://kafka.apache.org/documentationと一緒に仲介できると思います。 HTML#highlevelConsumerapi )は、SimpleConsumerよりも使いやすくなるはずです。消費者がすでにそれらのメッセージを持っていて、それらをDBに再送信したり、それを見ることができることを確認することができるため、消費者がデータベース失敗のメッセージを再読み取る必要があるとは思わない。

高水準消費者は、ZooKeeperの特定のパーティションから読み取られた最後のオフセットを(コンシューマグループ名に基づく)、消費者プロセスがダイと後で再起動されると(他のホスト上の潜在的に)、処理を続行することができます。中断したメッセージ。このオフセットを定期的にZookeeperに自動保存することができます(コンシューマプロパティのauth.jp.enableおよびauth.commit.interval.msを参照)、またはConsumerConnector.commitOffsetsを呼び出すことによってアプリケーションロジックによって保存されていることが可能です。関連項目 https://cwiki.apache.org/confluence/display/kafka/consumer +group +例

私はあなたがDBの確認応答を受けた後、オートコミットをオフにしてあなたのオフセットをコミットすることをお勧めします。したがって、消費者の障害が発生した場合、未処理のメッセージがKafkaから再読み取られ、Kafkaにコミットされたすべてのメッセージは最終的に少なくとも1回DBに到達することができます(ただし、「正確には1回は」)。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top