Lendo mensagens deslocadas no Apache Kafka
-
21-12-2019 - |
Pergunta
Eu sou muito novo em Kafka
e estamos usando Kafka 0.8.1
.
O que preciso fazer é consumir uma mensagem do tópico.Para isso terei que escrever um consumidor em Java que consumirá uma mensagem do tópico e depois salvará essa mensagem no banco de dados.Depois que uma mensagem for salva, alguma confirmação será enviada ao consumidor Java.Se o reconhecimento for verdadeiro, a próxima mensagem deverá ser consumida do tópico.Se o reconhecimento for falso (o que significa que devido a alguma mensagem de erro, lida no tópico, não pôde ser salva no banco de dados), então novamente essa mensagem deverá ser lida.
acho que preciso usar Simple Consumer
, para ter controle sobre o deslocamento da mensagem e seguir o exemplo do Consumidor Simples conforme fornecido neste link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.
Neste exemplo, o deslocamento é avaliado no método run como 'readOffset
'.Preciso brincar com isso?Por exemploEu posso usar LatestTime()
em vez de EarliestTime()
e em caso de falso, redefinirei o deslocamento para aquele antes de usar offset - 1
.
É assim que devo proceder?
Solução
Acho que você pode se dar bem usando o consumidor de alto nível (http://kafka.apache.org/documentation.html#highlevelconsumerapi), que deve ser mais fácil de usar que o SimpleConsumer.Não acho que o consumidor precise reler as mensagens do Kafka em caso de falha no banco de dados, pois o consumidor já possui essas mensagens e pode reenviá-las para o banco de dados ou fazer qualquer outra coisa que achar adequado.
Os consumidores de alto nível armazenam o último deslocamento lido de uma partição específica no Zookeeper (com base no nome do grupo de consumidores), de modo que quando um processo consumidor morre e é reiniciado posteriormente (potencialmente em outro host), ele pode continuar processando mensagens onde deixado de fora.É possível salvar automaticamente esse deslocamento no Zookeeper periodicamente (consulte as propriedades do consumidor auto.commit.enable e auto.commit.interval.ms) ou salvá-lo pela lógica do aplicativo chamando ConsumerConnector.commitOffsets
.Veja também https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .
Sugiro que você desative o commit automático e confirme suas compensações assim que receber o reconhecimento do banco de dados.Assim, você pode garantir que as mensagens não processadas sejam relidas do Kafka em caso de falha do consumidor e que todas as mensagens confirmadas no Kafka chegarão ao banco de dados pelo menos uma vez (mas não 'exatamente uma vez').