Question

I am using Kafka 0.8.0 and trying to achieve the below mentioned scenario.

JCA API (Acts as a producer and sends data to)-----> Consumer------> HBase

I am sending each message to consumer as soon as I fetch the data using JCA Client. For instance, as soon as producer sends message no.1 , I want to fetch the same from consumer and 'put' in HBase. But my consumer starts fetching the messages after some random n messages . I want to put the producer and consumer in sync so that both of them start working together.

I have used:

1 broker

1 single topic

1 single producer and high level Consumer

Can anyone suggest what do i need to do to achieve the same?

EDITED:

Adding some relevant code snippet.

Consumer.java

public class Consumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;
    PrintWriter pw = null;
    int t = 0;
    StringDecoder kd = new StringDecoder(null);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    Map<String, List<KafkaStream<String, Signal>>> consumerMap;
    KafkaStream<String, Signal> stream;
    ConsumerIterator<String, Signal> it;

    public Consumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

        this.topic = topic;
        topicCountMap.put(topic, new Integer(1));
        consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
                new VerifiableProperties()));
        stream = consumerMap.get(topic).get(0);
        it = stream.iterator();

    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("fetch.size", "1024");

        return new ConsumerConfig(props);

    }

    synchronized public void run() {

        while (it.hasNext()) {
            t = (it.next().message()).getChannelid();
            System.out.println("In Consumer received msg" + t);
        }
    }
}

producer.java

public class Producer {
    public final kafka.javaapi.producer.Producer<String, Signal> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic)
    {
        props.put("serializer.class", "org.bigdata.kafka.Serializer");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "localhost:9092");
        // Use random partitioner. Don't need the key type. Just set it to Integer.
        // The message is of type userdefined Object .
        producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
        this.topic = topic;
    }
}

KafkaProperties.java

public interface KafkaProperties {
    final static String zkConnect = "127.0.0.1:2181";
    final static String groupId = "group1";
    final static String topic = "test00";
    final static String kafkaServerURL = "localhost";
    final static int kafkaServerPort = 9092;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 100000;
    final static int reconnectInterval = 10000;
    final static String clientId = "SimpleConsumerDemoClient";
}

This is how the consumer is behaving for the first 10 messages it does not sysout that message received by consumer but from the 11th message onwards it starts functioning correctly.

     producer sending msg1

     producer sending msg2

     producer sending msg3

     producer sending msg4

     producer sending msg5

     producer sending msg6

     producer sending msg7

     producer sending msg8

     producer sending msg9

     producer sending msg10

     producer sending msg11

     producer sending msg12
     In Consumer received msg12

     producer sending msg13
     In Consumer received msg13

     producer sending msg14
     In Consumer received msg14

     producer sending msg15
     In Consumer received msg15

     producer sending msg16
     In Consumer received msg16

     producer sending msg17
     In Consumer received msg17

     producer sending msg18
     In Consumer received msg18

     producer sending msg19
     In Consumer received msg19

     producer sending msg20
     In Consumer received msg20

     producer sending msg21
     In Consumer received msg21

EDITED: adding the listener function where producer is sending messages to consumer. And I am using the default producer config did not overwrite it

public synchronized void onValueChanged(final MonitorEvent event_) {


    // Get the value from the DBR
    try {
        final DBR dbr = event_.getDBR();

        final String[] val = (String[]) dbr.getValue();

        producer1.producer.send(new KeyedMessage<String, Signal>         
                    (KafkaProperties.topic,new Signal(messageNo)));
        System.out.println("producer sending msg"+messageNo);

        messageNo++;


    } catch (Exception ex) {
        ex.printStackTrace();
    }
}
Was it helpful?

Solution

  1. Try to add props.put("request.required.acks", "1") to producer configuration. By default producer doesn't wait for acks and message delivery is not guaranteed. So, if you start broker just before your test, producer may start to send messages before broker is fully initialized and first several messages may be lost.

  2. Try to add props.put("auto.offset.reset", "smallest") to consumer configuration. It is equal to --from-beginning option of kafka-console-consumer.sh. If your consumer starts later than producer and there is no offset data saved in Zookeeper, then by default it will start consuming only new messages (see Consumer configs in docs).

OTHER TIPS

One of possibility is Kafka Lag. May be the consumer is overloaded with too many partitions. Or processing per message is very costly.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top