Question

Je suis très nouveau à Kafka et nous sommes à l'aide de Kafka 0.8.1.

Ce que j'ai à faire est de consommer un message de sujet.Pour cela, je dois écrire un consommateur en Java, ce qui va consommer un message de sujet et puis enregistrez le message de base de données.Après qu'un message est enregistré, certains accusé de réception sera envoyé à Java à la consommation.Si cet accusé de réception est vrai, alors la prochaine message doit être consommé à partir de la rubrique.Si acknowldgement est faux(ce qui signifie en raison de certaines de message d'erreur,lisez à partir de la rubrique, n'a pas pu être enregistré dans la base de données), puis de nouveau ce message doit être lu.

Je pense que j'ai besoin d'utiliser Simple Consumer,avoir le contrôle sur le message de décalage et sont passés par le Simple Consommateur exemple comme indiqué dans ce lien https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.

Dans cet exemple, le décalage est évaluée par la méthode run comme"readOffset'.Ai-je besoin de jouer avec qui?Par exempleJe peux utiliser LatestTime() au lieu de EarliestTime() et en cas de faux, je vais réinitialiser le décalage à un avant de les utiliser offset - 1.

Est-ce la façon dont je dois procéder?

Était-ce utile?

La solution

Je pense que vous pouvez obtenir avec l'aide de son niveau de consommation (http://kafka.apache.org/documentation.html#highlevelconsumerapi), cela devrait être plus facile à utiliser que le SimpleConsumer.Je ne pense pas que le consommateur a besoin de relire les messages de Kafka sur défaillance de base de données, que le consommateur a déjà de ces messages, et peut renvoyer à la DB ou faire tout ce que bon lui semble.

De haut niveau des consommateurs stocker le dernier décalage de lire à partir d'une partition spécifique dans Zookeeper (en fonction du nom du groupe), de sorte que lorsqu'un consommateur processus meurt et est ensuite redémarré (éventuellement sur un autre hôte), il peut continuer le traitement des messages où il l'avait laissé.Il est possible de autosave ce décalage de Zookeeper périodiquement (voir les propriétés de consommation de l'automobile.commettre.activer et auto.commettre.d'intervalle.ms), ou de l'enregistrer par la logique de l'application en appelant ConsumerConnector.commitOffsets .Voir aussi https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .

Je vous suggère de vous tourner à validation automatique off et valider votre décalages de vous-mêmes, une fois que vous avez reçu DB accusé de réception.Ainsi, vous pouvez assurez-vous des messages non traités sont relues, de Kafka dans le cas de la consommation d'échec et de tous les messages engage à Kafka finira par atteindre la DB au moins une fois (mais pas "exactement" une fois que).

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top