How to get an OffSet from a ByteBufferMessageSet returned by Simple Consumer of Kafka in Java?

StackOverflow https://stackoverflow.com/questions/17808826

  •  03-06-2022
  •  | 
  •  

Question

Consider the following code:

public static long Offset = 0L;
FetchRequest req = new FetchRequest(KafkaProperties.topic, 0, Offset,10485760);
ByteBufferMessageSet messageSet = simpleConsumer.fetch(req);

The question is how to get the last offset and set back variable Offset to read next batch of data from Kafka?


Update: When I print the data i.e.:

for (MessageAndOffset messageAndOffset : messageSet) { 
            System.out.println(messageAndOffset);
}

The output will be as follows:

MessageAndOffset(message(magic = 1, attributes = 0, crc = 2000130375, payload = java.nio.HeapByteBuffer[pos=0 lim=176 cap=176]),296215)
MessageAndOffset(message(magic = 1, attributes = 0, crc = 956398356, payload = java.nio.HeapByteBuffer[pos=0 lim=196 cap=196]),298144)
....
....
MessageAndOffset(message(magic = 1, attributes = 0, crc = 396743887, payload = java.nio.HeapByteBuffer[pos=0 lim=179 cap=179]),299136)

The docs says the last number is the offset

MessageAndOffset(message: Message, offset: Long)

That is in the above case, my last offset read will be 299136

Was it helpful?

Solution

Does something like this help ? One bad thing about this is it will loop forever.

    long offset = 0;

    while (true) {
        FetchRequest fetchrequest = new FetchRequest(topicName, 0, offset, 10485760);

        ByteBufferMessageSet messages = consumer.fetch(fetchrequest);
        for (MessageAndOffset msg : messages) {
            System.out.println("consumed: " + Utils.toString(msg.message().payload(), "UTF-8"));
            offset = msg.offset();
        }

    }

Also in the 0.8 Kafka SimpleConsumer example, they have some thing like below

    long numRead = 0;
    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
          long currentOffset = messageAndOffset.offset();
          if (currentOffset < readOffset) {
             System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
             continue;
          }
          readOffset = messageAndOffset.nextOffset();
          ByteBuffer payload = messageAndOffset.message().payload();

          byte[] bytes = new byte[payload.limit()];
          payload.get(bytes);
          System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
          numRead++;
          a_maxReads--;
    }

But they also mentioned that the application expects the a_maxread(Maximum number of messages to read) parameter to be passed as an argument so we don’t loop forever. I am new to kafka and not sure if this is what you are looking for.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top