Question

I am using Kafka 0.8 beta, and I am just trying to mess around with sending different objects, serializing them using my own encoder, and sending them to an existing broker configuration. For now I am trying to get just DefaultEncoder working.

I have the broker and everything setup and working for StringEncoder, but I am not able to get any other data type, including just pure byte[], to be sent and received by the broker.

My code for the Producer is:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;
import java.util.Random;


public class ProducerTest {
    public static void main(String[] args) {
        long events = 5;
        Random rnd = new Random();
        rnd.setSeed(new Date().getTime());
        Properties props = new Properties();
        props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094");
        props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
        props.setProperty("partitioner.class", "example.producer.SimplePartitioner");
        props.setProperty("request.required.acks", "1");
        props.setProperty("producer.type", "async");
        props.setProperty("batch.num.messages", "4");

        ProducerConfig config = new ProducerConfig(props);
        Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
        for (long nEvents = 0; nEvents < events; nEvents++) {
            byte[] a = "Hello".getBytes();
            byte[] b = "There".getBytes();

            KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b);
            producer.send(data);
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

I used the same SimplePartitioner as in the example given here, and replacing all the byte arrays by Strings and changing the serializer to kafka.serializer.StringEncoder works perfectly.

For reference, SimplePartitioner:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner<String> {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(String key, int a_numPartitions) {
        int partition = 0;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}

What am I doing wrong?

Was it helpful?

Solution

The answer is that the partitioning class SimplePartitioner is applicable only for Strings. When I try to run the Producer asynchronously, it creates a separate thread that handles the encoding and partitioning before sending to the broker. This thread hits a roadblock when it realizes that SimplePartitioner works only for Strings, but because it's a separate thread, no Exceptions are thrown, and so the thread just exits without any indication of wrongdoing.

If we change the SimplePartitioner to accept byte[], for instance:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner<byte[]> {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(byte[] key, int a_numPartitions) {
        int partition = 0;
        return partition;
    }

}

This works perfectly now.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top