문제

나는 매우 새로운 Kafka 그리고 우리는 사용하고 있습니다 Kafka 0.8.1.

내가 해야 할 일은 주제에서 메시지를 소비하는 것입니다.이를 위해 주제의 메시지를 소비한 다음 해당 메시지를 데이터베이스에 저장하는 소비자 하나를 Java로 작성해야 합니다.메시지가 저장된 후 일부 승인이 Java 소비자에게 전송됩니다.확인이 true이면 해당 주제에서 다음 메시지를 사용해야 합니다.확인이 false인 경우(일부 오류 메시지로 인해 항목을 읽었으나 데이터베이스에 저장할 수 없음을 의미) 해당 메시지를 다시 읽어야 합니다.

사용해야 할 것 같아요 Simple Consumer,메시지 오프셋을 제어하고 이 링크에 제공된 Simple Consumer 예제를 살펴보았습니다. https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.

이 예에서 오프셋은 run 메소드에서 '로 평가됩니다.readOffset'.그걸 가지고 놀아야 하나요?예를 들어나는 사용할 수 있다 LatestTime() 대신에 EarliestTime() 거짓인 경우 오프셋을 사용하기 전의 오프셋으로 재설정합니다. offset - 1.

이렇게 진행해야 하나요?

도움이 되었습니까?

해결책

높은 수준의 소비자를 사용하면 잘 지낼 수 있다고 생각합니다 (http://kafka.apache.org/documentation.html#highlevelconsumerapi), SimpleConsumer보다 사용하기가 더 쉽습니다.소비자는 이미 해당 메시지를 가지고 있고 이를 DB로 다시 보내거나 적절하다고 판단되는 다른 작업을 수행할 수 있으므로 데이터베이스 오류 시 소비자가 Kafka의 메시지를 다시 읽을 필요가 없다고 생각합니다.

상위 수준 소비자는 소비자 그룹 이름을 기준으로 특정 파티션에서 읽은 마지막 오프셋을 Zookeeper에 저장하므로 소비자 프로세스가 종료되고 나중에 다시 시작될 때(잠재적으로 다른 호스트에서) 메시지를 계속 처리할 수 있습니다. 중단되었습니다.이 오프셋을 주기적으로 Zookeeper에 자동 저장하거나(소비자 속성 auto.commit.enable 및 auto.commit.interval.ms 참조) 호출하여 애플리케이션 로직에 의해 저장할 수 있습니다. ConsumerConnector.commitOffsets .또한보십시오 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .

DB 승인을 받은 후 자동 커밋을 끄고 오프셋을 직접 커밋하는 것이 좋습니다.따라서 소비자 오류가 발생할 경우 처리되지 않은 메시지가 Kafka에서 다시 읽히도록 할 수 있으며 Kafka에 커밋된 모든 메시지는 결국 적어도 한 번(그러나 '정확히 한 번'은 아님) DB에 도달하게 됩니다.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top