Domanda

Sono molto nuovo Kafka e stiamo utilizzando Kafka 0.8.1.

Quello che devo fare è consumare un messaggio dall'argomento.Per questo, dovrò scrivere un consumatore in Java che consumerà un messaggio dall'argomento e quindi salverà quel messaggio nel database.Dopo che un messaggio è stato salvato, verrà inviata una conferma al consumatore Java.Se il riconoscimento è vero, il messaggio successivo dovrebbe essere utilizzato dall'argomento.Se il riconoscimento è falso (il che significa che a causa di qualche messaggio di errore, letto dall'argomento, non è stato possibile salvarlo nel database), allora di nuovo quel messaggio dovrebbe essere letto.

Penso che dovrei usarlo Simple Consumer, per avere il controllo sull'offset del messaggio e aver seguito l'esempio del consumatore semplice come indicato in questo collegamento https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.

In questo esempio, l'offset viene valutato nel metodo run come 'readOffset'.Devo giocarci?Per es.posso usare LatestTime() invece di EarliestTime() e in caso di falso, ripristinerò l'offset su quello prima dell'uso offset - 1.

È così che dovrei procedere?

È stato utile?

Soluzione

Penso che tu possa andare d'accordo con l'utilizzo del consumatore di alto livello (http://kafka.apache.org/documentation.html#highlevelconsumerapi), che dovrebbe essere più facile da usare rispetto a SimpleConsumer.Non penso che il consumatore abbia bisogno di rileggere i messaggi di Kafka in caso di errore del database, poiché il consumatore ha già quei messaggi e può inviarli nuovamente al DB o fare qualsiasi altra cosa ritenga opportuna.

I consumer di alto livello memorizzano l'ultimo offset letto da una partizione specifica in Zookeeper (in base al nome del gruppo consumer), in modo che quando un processo consumer muore e viene successivamente riavviato (potenzialmente su un altro host), può continuare a elaborare i messaggi dove si trova lasciato fuori.È possibile salvare automaticamente questo offset su Zookeeper periodicamente (vedere le proprietà del consumatore auto.commit.enable e auto.commit.interval.ms) o salvarlo tramite la logica dell'applicazione chiamando ConsumerConnector.commitOffsets .Guarda anche https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .

Ti suggerisco di disattivare il commit automatico e di impegnare tu stesso i tuoi offset una volta ricevuto il riconoscimento del DB.Pertanto, puoi assicurarti che i messaggi non elaborati vengano riletti da Kafka in caso di errore del consumatore e tutti i messaggi inviati a Kafka raggiungeranno eventualmente il DB almeno una volta (ma non "esattamente una volta").

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top