Question

I wrote a high level Kafka consumer as a part of a java application.

So the core code looks like this:

public void start() {
    ConsumerConnector consumerConnector = conf.getConsumerConnector();
    String topic = conf.getTopic();
    int numOfThereads = conf.getNumOfThreads();

    Map<String, Integer> topicCountMap = ImmutableMap.of(topic, numOfThereads);
    Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(topicCountMap);
    List<KafkaStream<Message>> streams = topicMessageStreams.get(topic);

    // create 4 threads to consume from each of the partitions
    executor = Executors.newFixedThreadPool(numOfThereads);

    // consume the messages in the threads
    for (final KafkaStream<Message> stream : streams) {
        executor.submit(new ConsumerThread(stream));
    }
}

To test my consumer I also created a producer, wrote to kafka and then launched my consumer, it works. Since the threads are executed inside a loop I'm not sure I get things right. I would like my consumer to run forever and keep on consuming messages from kafka.

What is the right way to make it run forever?

Was it helpful?

Solution

@forhas Thanks for confirming

Basically from the documentation the way they iterate over the consumed message is given as below

    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
    while (it.hasNext())
        System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));

And it also state that The interesting part here is the while (it.hasNext()) section. Basically this code reads from Kafka until you stop it ..

So ideally it should be keep on running unless and until we kill it explicitly and once a new message has been produced the same would be available on the consumer side.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top