Frage

Ich bin sehr neu in Kafka und wir benutzen Kafka 0.8.1.

Was ich tun muss, ist eine Nachricht aus dem Thema zu konsumieren.Dafür muss ich einen Konsumenten in Java schreiben, der eine Nachricht aus dem Thema konsumiert und diese Nachricht dann in der Datenbank speichert.Nachdem eine Nachricht gespeichert wurde, wird eine Bestätigung an den Java-Verbraucher gesendet.Wenn die Bestätigung wahr ist, sollte die nächste Nachricht aus dem Thema konsumiert werden.Wenn die Bestätigung falsch ist (was bedeutet, dass aufgrund einer Fehlermeldung, die aus dem Thema gelesen wurde, nicht in der Datenbank gespeichert werden konnte), sollte diese Nachricht erneut gelesen werden.

Ich denke, ich muss verwenden Simple Consumer, um die Kontrolle über den Nachrichtenoffset zu haben und das einfache Verbraucherbeispiel wie in diesem Link angegeben durchlaufen zu haben https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.

In diesem Beispiel wird Offset in der Ausführungsmethode als 'ausgewertetreadOffset'.Muss ich damit spielen?Für z.B.Ich kann verwenden LatestTime() statt EarliestTime() und im Falle von false setze ich den Offset auf den vor der Verwendung zurück offset - 1.

Soll ich so vorgehen?

War es hilfreich?

Lösung

Ich denke, Sie können mit dem High-Level-Verbraucher auskommen (http://kafka.apache.org/documentation.html#highlevelconsumerapi), das einfacher zu bedienen sein sollte als der SimpleConsumer.Ich glaube nicht, dass der Verbraucher Nachrichten von Kafka bei einem Datenbankfehler erneut lesen muss, da der Verbraucher diese Nachrichten bereits hat und sie erneut an die Datenbank senden oder alles andere tun kann, was er für richtig hält.

Verbraucher auf hoher Ebene speichern den zuletzt von einer bestimmten Partition gelesenen Offset in Zookeeper (basierend auf dem Namen der Verbrauchergruppe), sodass ein Verbraucherprozess beim Absterben und späteren Neustart (möglicherweise auf einem anderen Host) Nachrichten dort weiterverarbeiten kann, wo er aufgehört hat aus.Es ist möglich, diesen Offset regelmäßig automatisch für Zookeeper zu speichern (siehe die Verbrauchereigenschaften auto.begehen.aktivieren und auto.commit.interval.ms ), oder lassen Sie es von der Anwendungslogik durch Aufrufen speichern ConsumerConnector.commitOffsets .Siehe auch https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .

Ich schlage vor, dass Sie das automatische Festschreiben deaktivieren und Ihre Offsets selbst festschreiben, sobald Sie eine DB-Bestätigung erhalten haben.Auf diese Weise können Sie sicherstellen, dass unverarbeitete Nachrichten im Falle eines Verbraucherfehlers erneut von Kafka gelesen werden und alle an Kafka festgeschriebenen Nachrichten schließlich mindestens einmal (aber nicht 'genau einmal') die Datenbank erreichen.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top