Вопрос

Я очень новичок в Kafka и мы используем Kafka 0.8.1.

Что мне нужно сделать, так это получить сообщение из темы.Для этого мне придется написать на Java одного потребителя, который будет принимать сообщение из темы, а затем сохранять это сообщение в базе данных.После сохранения сообщения потребителю Java будет отправлено подтверждение.Если подтверждение истинно, то следующее сообщение должно быть использовано из темы.Если подтверждение ложно (что означает, что из-за какого-то сообщения об ошибке, прочитанного из темы, не удалось сохранить в базе данных), то это сообщение снова следует прочитать.

Я думаю, мне нужно использовать Simple Consumer, чтобы иметь контроль над смещением сообщения и пройти пример Simple Consumer, как указано в этой ссылке. https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.

В этом примере смещение оценивается в методе run как 'readOffset'.Нужно ли мне с этим играть?Например,я могу использовать LatestTime() вместо EarliestTime() и в случае ложного значения я сбросю смещение на то, которое было перед использованием offset - 1.

Вот как мне следует действовать?

Это было полезно?

Решение

Я думаю, вы можете обойтись использованием потребителя высокого уровня (http://kafka.apache.org/documentation.html#highlevelconsumerapi), его должно быть проще использовать, чем SimpleConsumer.Я не думаю, что потребителю нужно перечитывать сообщения Kafka при сбое базы данных, поскольку у потребителя уже есть эти сообщения и он может повторно отправить их в БД или сделать что-нибудь еще, что сочтет нужным.

Потребители высокого уровня сохраняют последнее смещение, прочитанное из определенного раздела в Zookeeper (на основе имени группы потребителей), поэтому, когда потребительский процесс завершается и позже перезапускается (возможно, на другом хосте), он может продолжить обработку сообщений там, где он находится. остановился.Это смещение можно автоматически сохранять в Zookeeper периодически (см. потребительские свойства auto.commit.enable и auto.commit.interval.ms) или сохранить его логикой приложения, вызвав ConsumerConnector.commitOffsets .Смотрите также https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .

Я предлагаю вам отключить автоматическую фиксацию и самостоятельно фиксировать свои смещения после получения подтверждения от БД.Таким образом, вы можете быть уверены, что необработанные сообщения будут перечитаны из Kafka в случае сбоя потребителя, и все сообщения, переданные в Kafka, в конечном итоге достигнут БД хотя бы один раз (но не «ровно один раз»).

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top