Question

There is following consumer code:

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer

kafka = KafkaClient("localhost", 9092)

consumer = SimpleConsumer(kafka, "my-group", "my-topic")
consumer.seek(0, 2)
for message in consumer:
  print message

kafka.close()

Then I produce message with script:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

The thing is that when I start consumers as two different processes then I receive new messages in each process. However I want it to be sent to only one consumer, not broadcasted.

In documentation of Kafka (https://kafka.apache.org/documentation.html) there is written:

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

I see that group for these consumers is the same - my-group.

How to make it so that new message is read by exactly one consumer instead of broadcasting it?

Was it helpful?

Solution

the consumer-group API was not officially supported until kafka v. 0.8.1 (released Mar 12, 2014). For server versions prior, consumer groups do not work correctly. And as of this post the kafka-python library does not currently attempt to send group offset data:

https://github.com/mumrah/kafka-python/blob/c9d9d0aad2447bb8bad0e62c97365e5101001e4b/kafka/consumer.py#L108-L115

OTHER TIPS

Its hard to tell from the example above what your Zookeeper configuration is or if there's one at all. You'll need a Zookeeper cluster for the consumer group information to be persisted WRT what consumer within each group has consumed to a given offset.

A solid example is here: Official Kafka documentation - Consumer Group Example

This should not happen - make sure that both of the consumers are being registered under the same consumer group in the zookeeper znodes. Each message to a topic should be consumed by a consumer group exactly once, so one consumer out of everyone in the group should receive the message, not what you are experiencing. What version of Kafka are you using?

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top