Question

I'm trying to retrieve data from my Kafka 0.8.1 cluster. I have brought into existence an instance of ZookeeperConsumerConnector and then attempt to call createMessageStreams on it. However, no matter what I do, it seems createMessageStreams just hangs and never returns, even if it is the only thing I have done with Kafka.

Reading mailing lists it seems this can sometimes happen for a few reasons, but as far as I can tell I haven't done any of those things.

Further, I'll point out that I'm actually doing this in Clojure using clj-kafka, but I suspect clj-kafka is not the issue because I have the problem even if I run this code:

(.createMessageStreams
  (clj-kafka.consumer.zk/consumer {"zookeeper.connect" "127.0.0.1:2181"
                                   "group.id" "my.consumer"
                                   "auto.offset.reset" "smallest"
                                   "auto.commit.enable" "false"})
  {"mytopic" (int 1)})

And clj-kafka.consumer.zk/consumer just uses Consumer.createJavaConsumerConnector to create a ZookeeperConsumerConnector without doing anything too fancy.

Also, there are definitely messages in "mytopic" because from the command line I can run the following and get back everything I've already sent to the topic:

% kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic mytopic --from-beginning

So it's also not that the topic is empty.

Feeling stumped at this point. Ideas?

ETA: By "hang" I guess what I really mean is that it seems to spin up a thread and then stay stuck in it never doing anything. If I run this code from the REPL I can get out of it by hitting control-c and then I get this error:

IllegalMonitorStateException   java.util.concurrent.locks.ReentrantLock$Sync.tryRelease (ReentrantLock.java:155)
Was it helpful?

Solution

I was experiencing the same issue with the same exception when interrupting the REPL. The reason it hangs is due to the lazy-iterate function in the consumer.zk namespace. The queue from which messages are read is a LinkedBlockingQueue and the call to .hasNext in the lazy-iterate function calls .take on this queue. This creates a read lock on the queue and will block and wait until something is available to take off the queue. This means that the lazy-iterate function will never actually return. lazy-iterate is called by the 'messages' function and if you don't do something like

(take 2 (messages "mytopic" some-consumer))

then the messages function will never return and hang indefinitely. It's my opinion that this is a bug (or design flaw) in clj-kafka. To illustrate that this is indeed what's happening, try setting "consumer.timeout.ms" "0" in your consumer config. It will throw a TimeoutExpection and return control to the REPL.

This further creates a problem with the 'with-resource' macro. The macro takes a binding to a consumer, a shutdown function, and a body; it calls the body and then the shutdown fn. If inside the body, you make a call to 'messages', the body will never return and thus the shutdown function will never be called. The messages function WOULD return if shutdown was called, because shutdown puts a message on the queue that signals the consumer to clean up its resources and threads in preparation for GC. This macro puts the application into a state where the only way to get out of the main loop is to kill the application (or a thread calling it) itself. The library certainly has a ways to go before it's ready for a production environment.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top