Pergunta

Eu sou muito novo em Kafka e estamos usando Kafka 0.8.1.

O que preciso fazer é consumir uma mensagem do tópico.Para isso terei que escrever um consumidor em Java que consumirá uma mensagem do tópico e depois salvará essa mensagem no banco de dados.Depois que uma mensagem for salva, alguma confirmação será enviada ao consumidor Java.Se o reconhecimento for verdadeiro, a próxima mensagem deverá ser consumida do tópico.Se o reconhecimento for falso (o que significa que devido a alguma mensagem de erro, lida no tópico, não pôde ser salva no banco de dados), então novamente essa mensagem deverá ser lida.

acho que preciso usar Simple Consumer, para ter controle sobre o deslocamento da mensagem e seguir o exemplo do Consumidor Simples conforme fornecido neste link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.

Neste exemplo, o deslocamento é avaliado no método run como 'readOffset'.Preciso brincar com isso?Por exemploEu posso usar LatestTime() em vez de EarliestTime() e em caso de falso, redefinirei o deslocamento para aquele antes de usar offset - 1.

É assim que devo proceder?

Foi útil?

Solução

Acho que você pode se dar bem usando o consumidor de alto nível (http://kafka.apache.org/documentation.html#highlevelconsumerapi), que deve ser mais fácil de usar que o SimpleConsumer.Não acho que o consumidor precise reler as mensagens do Kafka em caso de falha no banco de dados, pois o consumidor já possui essas mensagens e pode reenviá-las para o banco de dados ou fazer qualquer outra coisa que achar adequado.

Os consumidores de alto nível armazenam o último deslocamento lido de uma partição específica no Zookeeper (com base no nome do grupo de consumidores), de modo que quando um processo consumidor morre e é reiniciado posteriormente (potencialmente em outro host), ele pode continuar processando mensagens onde deixado de fora.É possível salvar automaticamente esse deslocamento no Zookeeper periodicamente (consulte as propriedades do consumidor auto.commit.enable e auto.commit.interval.ms) ou salvá-lo pela lógica do aplicativo chamando ConsumerConnector.commitOffsets .Veja também https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .

Sugiro que você desative o commit automático e confirme suas compensações assim que receber o reconhecimento do banco de dados.Assim, você pode garantir que as mensagens não processadas sejam relidas do Kafka em caso de falha do consumidor e que todas as mensagens confirmadas no Kafka chegarão ao banco de dados pelo menos uma vez (mas não 'exatamente uma vez').

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top