我很新 Kafka 我们正在使用 Kafka 0.8.1.

我需要做的是消费来自主题的消息。为此,我必须用 Java 编写一个消费者,它将使用主题中的一条消息,然后将该消息保存到数据库中。保存消息后,将向 Java 消费者发送一些确认信息。如果确认为真,则应从主题中使用下一条消息。如果确认为 false(这意味着由于从主题读取的某些错误消息,无法将其保存到数据库中),则应再次读取该消息。

我想我需要使用 Simple Consumer,控制消息偏移量并完成此链接中给出的简单消费者示例 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.

在此示例中,在 run 方法中将偏移量计算为 'readOffset'。我需要玩那个吗?例如我可以用 LatestTime() 代替 EarliestTime() 如果为 false,我会将偏移量重置为使用前的偏移量 offset - 1.

我应该这样继续吗?

有帮助吗?

解决方案

我认为您可以使用高级消费者(http://kafka.apache.org/documentation.html#highlevelconsumerapi),这应该比 SimpleConsumer 更容易使用。我认为消费者不需要在数据库故障时重新读取来自 Kafka 的消息,因为消费者已经拥有这些消息,并且可以将它们重新发送到数据库或执行其认为合适的任何其他操作。

高级消费者将从Zookeeper中的特定分区读取的最后一个偏移量存储在Zookeeper中(基于消费者组名称),以便当消费者进程死亡并稍后重新启动(可能在其他主机上)时,它可以继续处理它所在的消息。离开了。可以定期将此偏移量自动保存到 Zookeeper(请参阅使用者属性 auto.commit.enable 和 auto.commit.interval.ms),或者通过调用由应用程序逻辑保存它 ConsumerConnector.commitOffsets 。也可以看看 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .

我建议您关闭自动提交,并在收到数据库确认后自行提交偏移量。因此,您可以确保在消费者发生故障时从 Kafka 重新读取未处理的消息,并且提交到 Kafka 的所有消息最终将至少到达数据库一次(但不是“恰好一次”)。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top