Question

I have a topology with Kafka spout somewhat like below

SpoutConfig spoutConfig = new SpoutConfig(zkBrokerHosts, "some-topic","", "some-id");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
...

builder.setSpout("kafkaSpout",new KafkaSpout(spoutConfig),1);

And of course it works fine.

Considering the case that my topology fails and running it up again, I want KafkaSpout to read from the latest offset of that topic not from last offset the consumer have read.

Is there any option? I tried

spoutConfig.startOffsetTime=System.currentTimeMillis();

but seems it doesn't work as I want. and neither kafkaConfig.forceStartOffsetTime(-2);

Let me know if you have some idea.

Was it helpful?

Solution

Try kafkaConfig.forceStartOffsetTime(-1). -1 for the latest Kafka offset, and -2 for the earliest available offset.

EDIT:

Also, you can force the spout to start consuming from any desired offset with the same option -- just pass the numeric offset as the only argument.

Ignore the "Time" in forceStartOffsetTime, the parameter name is a bit confusing. Offsets in Kafka are numbers and have no connection to any concept of time whatsoever. -1 is just a special way of telling the Kafka spout to gather the latest offset from Kafka itself (idem -2 for the earliest available offset).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top