Two independent storm topologies running on different clusters reading from same kafka topic (using kafka-spout) gives Error

StackOverflow https://stackoverflow.com/questions/23096663

  •  04-07-2023
  •  | 
  •  

Question

public class WordCountTopology {
   private static final String FILTER_BOLT_ID = "filter-bolt";
   private static final String COUNT_BOLT_ID = "count-bolt";
   private static final String TOPOLOGY_NAME = "word-count-topology";

static final PropertiesConfiguration CONFIGURATION = new PropertiesConfiguration();

static {
try {
  CONFIGURATION.load(ClassLoader.getSystemResourceAsStream(
      "config/topo.properties"));
  } catch (ConfigurationException e) {
  throw new RuntimeException("Failed to load topo properties");
  }
}


private static KafkaSpout createKafkaSpout() {
  List<String> hostList = new ArrayList<String>();
  String kafkaPortStr = CONFIGURATION.getString("KAFKA_PORT");
  String[] kafkaHosts = CONFIGURATION.getString("KAFKA_HOSTS").split(",");
  for (String kafkaHost : kafkaHosts) {
    hostList.add(kafkaHost + ":" + kafkaPortStr);
  }
  BrokerHosts hosts = StaticHosts.fromHostString(
    hostList, CONFIGURATION.getInt("NUM_KAFKA_PARTITIONS"));

  SpoutConfig spoutConfig = new SpoutConfig(
    hosts,
    CONFIGURATION.getString("KAFKA_TOPIC"),
    CONFIGURATION.getString("ZOOKEEPER_STORAGE_PATH"),
    CONFIGURATION.getString("ZOOKEEPER_STORAGE_ID"));
    spoutConfig.forceStartOffsetTime(-1);

  return new KafkaSpout(spoutConfig);
}


topo.properties (not full file):

Topology1
KAFKA_TOPIC=varnish
ZOOKEEPER_STORAGE_PATH=/kafkastorm
ZOOKEEPER_STORAGE_ID=discovery

Topology2
KAFKA_TOPIC=varnish
ZOOKEEPER_STORAGE_PATH=/kafkastorm8
ZOOKEEPER_STORAGE_ID=discovery8

I am trying to run two tpoplogies consuming data from same kafka topic, topology 1 works fine with similar code as above but topology2 gives err.

I am getting the Following Error for kafka spout

kafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl

Was it helpful?

Solution 2

The solution was that I had to go to the zookeeper and using zookeeper cli I deleted the zk storage root path /kafkastorm8 for the topology2 and resubmitted the topology and it worked fine.

OTHER TIPS

could you please try to change and see if it works
spoutConfig.forceStartOffsetTime(-1);
to
spoutConfig.forceStartOffsetTime(-2);
This will force the spout to rewind to a previous offset, for more info you can read this page

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top