Does something like this help ? One bad thing about this is it will loop forever.
long offset = 0;
while (true) {
FetchRequest fetchrequest = new FetchRequest(topicName, 0, offset, 10485760);
ByteBufferMessageSet messages = consumer.fetch(fetchrequest);
for (MessageAndOffset msg : messages) {
System.out.println("consumed: " + Utils.toString(msg.message().payload(), "UTF-8"));
offset = msg.offset();
}
}
Also in the 0.8 Kafka SimpleConsumer example, they have some thing like below
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
But they also mentioned that the application expects the a_maxread
(Maximum number of messages to read) parameter to be passed as an argument so we don’t loop forever. I am new to kafka and not sure if this is what you are looking for.