Question

Kafka 0.8 works great. I am able to use CLI as well as write my own producers/consumers!

Checking Zookeeper... and I see all the topics and partitions created successfully for 0.8.

Kafka 0.7 does not work!

Why Kafka 0.7? I am using Kafka Spout from Storm which is made for Kafka 0.7.

First I just want to run CLI based producer/consumer for Kafka 0.7, which I am unable to. I carry out the following steps:

  1. I delete all the topics/partitions etc. in Zookeeper that were created from my Kafka 0.8
  2. I change the dataDir in zoo.cfg to point to different location.
  3. Now I start the kafka server 0.7. It starts successfully. However I don’t know why it again registers the broker topics I deleted?
  4. Now I start the Kafka Producer :

    bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic topicime & it starts successfully: [2013-06-28 14:06:05,521] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2013-06-28 14:06:05,606] INFO Creating async producer for broker id = 0 at 0:0 (kafka.producer.ProducerPool)

  5. Time to send some messages & oops I get this error:

    [2013-06-28 14:07:19,650] INFO Disconnecting from 0:0 (kafka.producer.SyncProducer) [2013-06-28 14:07:19,653] ERROR Connection attempt to 0:0 failed, next attempt in 1 ms (kafka.producer.SyncProducer) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:364) at sun.nio.ch.Net.connect(Net.java:356) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:623) at kafka.producer.SyncProducer.connect(SyncProducer.scala:173) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196) at kafka.producer.SyncProducer.send(SyncProducer.scala:92) at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135) at kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) at scala.collection.immutable.Stream.foreach(Stream.scala:254) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)

Note that Zookeeper is already running.

Any help would really be appreciated.

EDIT:

I don't even see the topic being created in zookeeper. I am running the following command:

bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic topicime

After the command everything is fine & I get the following message:

[2013-06-28 14:30:17,614] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x13f805c6673004b, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2013-06-28 14:30:17,615] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2013-06-28 14:30:17,700] INFO Creating async producer for broker id = 0 at 0:0 (kafka.producer.ProducerPool)

However now when i type a string to send I get the above error (Connection refused!)

Was it helpful?

Solution

INFO Disconnecting from 0:0 (kafka.producer.SyncProducer) 

The above line has the error hidden in it. 0:0 is not a valid host and port. The solution is to explicitly set the host ip to be registered in Zookeeper by setting the "hostname" property in server.properties.

OTHER TIPS

Consider checking out the storm-kafka fork, available at https://github.com/wurstmeister/storm-kafka-0.8-plus

I'm installing it right now for our servers =).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top