Смещение чтения сообщений в Apache Kafka
-
21-12-2019 - |
Вопрос
Я очень новичок в Kafka
и мы используем Kafka 0.8.1
.
Что мне нужно сделать, так это получить сообщение из темы.Для этого мне придется написать на Java одного потребителя, который будет принимать сообщение из темы, а затем сохранять это сообщение в базе данных.После сохранения сообщения потребителю Java будет отправлено подтверждение.Если подтверждение истинно, то следующее сообщение должно быть использовано из темы.Если подтверждение ложно (что означает, что из-за какого-то сообщения об ошибке, прочитанного из темы, не удалось сохранить в базе данных), то это сообщение снова следует прочитать.
Я думаю, мне нужно использовать Simple Consumer
, чтобы иметь контроль над смещением сообщения и пройти пример Simple Consumer, как указано в этой ссылке. https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.
В этом примере смещение оценивается в методе run как 'readOffset
'.Нужно ли мне с этим играть?Например,я могу использовать LatestTime()
вместо EarliestTime()
и в случае ложного значения я сбросю смещение на то, которое было перед использованием offset - 1
.
Вот как мне следует действовать?
Решение
Я думаю, вы можете обойтись использованием потребителя высокого уровня (http://kafka.apache.org/documentation.html#highlevelconsumerapi), его должно быть проще использовать, чем SimpleConsumer.Я не думаю, что потребителю нужно перечитывать сообщения Kafka при сбое базы данных, поскольку у потребителя уже есть эти сообщения и он может повторно отправить их в БД или сделать что-нибудь еще, что сочтет нужным.
Потребители высокого уровня сохраняют последнее смещение, прочитанное из определенного раздела в Zookeeper (на основе имени группы потребителей), поэтому, когда потребительский процесс завершается и позже перезапускается (возможно, на другом хосте), он может продолжить обработку сообщений там, где он находится. остановился.Это смещение можно автоматически сохранять в Zookeeper периодически (см. потребительские свойства auto.commit.enable и auto.commit.interval.ms) или сохранить его логикой приложения, вызвав ConsumerConnector.commitOffsets
.Смотрите также https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .
Я предлагаю вам отключить автоматическую фиксацию и самостоятельно фиксировать свои смещения после получения подтверждения от БД.Таким образом, вы можете быть уверены, что необработанные сообщения будут перечитаны из Kafka в случае сбоя потребителя, и все сообщения, переданные в Kafka, в конечном итоге достигнут БД хотя бы один раз (но не «ровно один раз»).