Question

I would appreciate your help on this.

I am building a Apache Kafka consumer to subscribe to another already running Kafka. Now, my problem is that when my producer pushes message to server...my consumer does not receive them .. and I get the below info in my logs printed::

13/08/30 18:00:58 INFO producer.SyncProducer: Connected to xx.xx.xx.xx:6667:false for producing
13/08/30 18:00:58 INFO producer.SyncProducer: Disconnecting from xx.xx.xx.xx:6667:false
13/08/30 18:00:58 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-     1377910855898] Stopping leader finder thread
13/08/30 18:00:58 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-    1377910855898] Stopping all fetchers 
13/08/30 18:00:58 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager- 1377910855898] All connections stopped

I am not sure if I am missing any important configuration here...However, I can see some messages coming from my server using WireShark but they are not getting consumed by my consumer....

My code is the exact replica of the sample consumer example: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

UPDATE:

[2013-09-03 00:57:30,146] INFO Starting ZkClient event thread.     
(org.I0Itec.zkclient.ZkEventThread)
[2013-09-03 00:57:30,146] INFO Opening socket connection to server /xx.xx.xx.xx:2181 (org.apache.zookeeper.ClientCnxn)
[2013-09-03 00:57:30,235] INFO Connected to xx.xx.xx:6667 for producing (kafka.producer.SyncProducer)
[2013-09-03 00:57:30,299] INFO Socket connection established to 10.224.62.212/10.224.62.212:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2013-09-03 00:57:30,399] INFO Disconnecting from xx.xx.xx.net:6667 (kafka.producer.SyncProducer)
[2013-09-03 00:57:30,400] INFO [ConsumerFetcherManager-1378195030845] Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager)
[2013-09-03 00:57:30,400] INFO [ConsumerFetcherManager-1378195030845] Stopping all fetchers (kafka.consumer.ConsumerFetcherManager)
[2013-09-03 00:57:30,400] INFO [ConsumerFetcherManager-1378195030845] All connections stopped (kafka.consumer.ConsumerFetcherManager)
[2013-09-03 00:57:30,400] INFO [console-consumer-49997_xx.xx.xx-1378195030443-cce6fc51],    Cleared all relevant queues for this fetcher (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,400] INFO [console-consumer-49997_xx.xx.xx.-1378195030443-cce6fc51], Cleared the data chunks in all the consumer message iterators (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,400] INFO [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], Committing all offsets after clearing the fetcher queues (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,401] ERROR [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], zk client is null. Cannot commit offsets (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,401] INFO [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], Releasing partition ownership (kafka.consumer.ZookeeperConsumerConnector)
[2013-09-03 00:57:30,401] INFO [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], exception during rebalance  (kafka.consumer.ZookeeperConsumerConnector)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:185)
at scala.None$.get(Option.scala:183)
at    kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:434)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:429)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:429)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:681)
at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:715)
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Was it helpful?

Solution

Can you please provide your producer code sample?

Do you have the latest 0.8 version checked out? It appears that there has been some known issue with consumerFetched deadlock which has been patched and fixed in the current version

you can try to use the admin console script to consume messages making sure your producer is working fine.

If possible post some more logs and code snippet, should help debugging further (it seems I need more reputation to make a comment so had to answer instead)

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top