Lettura dei messaggi compensati in Apache Kafka
-
21-12-2019 - |
Domanda
Sono molto nuovo Kafka
e stiamo utilizzando Kafka 0.8.1
.
Quello che devo fare è consumare un messaggio dall'argomento.Per questo, dovrò scrivere un consumatore in Java che consumerà un messaggio dall'argomento e quindi salverà quel messaggio nel database.Dopo che un messaggio è stato salvato, verrà inviata una conferma al consumatore Java.Se il riconoscimento è vero, il messaggio successivo dovrebbe essere utilizzato dall'argomento.Se il riconoscimento è falso (il che significa che a causa di qualche messaggio di errore, letto dall'argomento, non è stato possibile salvarlo nel database), allora di nuovo quel messaggio dovrebbe essere letto.
Penso che dovrei usarlo Simple Consumer
, per avere il controllo sull'offset del messaggio e aver seguito l'esempio del consumatore semplice come indicato in questo collegamento https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.
In questo esempio, l'offset viene valutato nel metodo run come 'readOffset
'.Devo giocarci?Per es.posso usare LatestTime()
invece di EarliestTime()
e in caso di falso, ripristinerò l'offset su quello prima dell'uso offset - 1
.
È così che dovrei procedere?
Soluzione
Penso che tu possa andare d'accordo con l'utilizzo del consumatore di alto livello (http://kafka.apache.org/documentation.html#highlevelconsumerapi), che dovrebbe essere più facile da usare rispetto a SimpleConsumer.Non penso che il consumatore abbia bisogno di rileggere i messaggi di Kafka in caso di errore del database, poiché il consumatore ha già quei messaggi e può inviarli nuovamente al DB o fare qualsiasi altra cosa ritenga opportuna.
I consumer di alto livello memorizzano l'ultimo offset letto da una partizione specifica in Zookeeper (in base al nome del gruppo consumer), in modo che quando un processo consumer muore e viene successivamente riavviato (potenzialmente su un altro host), può continuare a elaborare i messaggi dove si trova lasciato fuori.È possibile salvare automaticamente questo offset su Zookeeper periodicamente (vedere le proprietà del consumatore auto.commit.enable e auto.commit.interval.ms) o salvarlo tramite la logica dell'applicazione chiamando ConsumerConnector.commitOffsets
.Guarda anche https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .
Ti suggerisco di disattivare il commit automatico e di impegnare tu stesso i tuoi offset una volta ricevuto il riconoscimento del DB.Pertanto, puoi assicurarti che i messaggi non elaborati vengano riletti da Kafka in caso di errore del consumatore e tutti i messaggi inviati a Kafka raggiungeranno eventualmente il DB almeno una volta (ma non "esattamente una volta").