Question

I need to get the message produced in Kafka hour by hour in a day. Every one hour I will launch a job to consume the message produced 1 hour ago. e.g., if current time is 20:12, I will consume the message between 19:00:00 and 19:59:59. That means I need to get start offset by time 19:00:00 and end offset by time 19:59:59. I used SimpleConsumer.getOffsetsBefore as shown in 「0.8.0 SimpleConsumer Example」. The problem is the returning offset does not match the timestamp given as a parameter. e.g. When make timestamp 19:00:00, I get the message produced at time 16:38:00.

Was it helpful?

Solution 2

In Kafka there is currently no way to get an offset that corresponds to a particular timestamp - this is by design. As described near the top of Jay Kreps's Log Article, the offset number provides a sort of timestamp for the log that is decoupled from the wall clock time. With the offset as your notion of time then you can know if any two systems are in a consistent state just buy knowing what offset they have read until. There is never any confusion about different clock times on different servers, leap years, day light savings time, time zones, etc. It's kinda nice...

NOW... all that said, if you know your server went down at some time X then practically speaking, you would really like to know the corresponding offset. You can get close. The log files on the kafka machines are named according to the time that they started writing, and there exists a kafka tool (that I can't find right now) that let's you know which offsets are associated with these files. If you want to know the exact timestamp though, then you must encode the timestamp in the messages that you're sending to Kafka.

OTHER TIPS

Below kafka consumer api method getOffsetsByTimes() can be used for this , it is available from 0.10.0 version or higher. See JavaDoc.

/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * Notice that this method may block indefinitely if the partition does not exist.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws IllegalArgumentException if the target timestamp is negative.
 */
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
    for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
        // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
        // OffsetAndTimestamp is always positive.
        if (entry.getValue() < 0)
            throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                    entry.getValue() + ". The target time cannot be negative.");
    }
    return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}

As the other replies note, older versions of Kafka had only an approximate way of mapping times to offsets. However, since Kafka 0.10.0 (released in May 2016), Kafka maintains a time index for each topic. This will allow you to efficiently get from times to exact offsets. You can use the KafkaConsumer#offsetsForTimes method to access this information.

There are more details about how the time-based index is implemented on the KIP-33 design discussion page.

Show you the code:

public static Map<TopicPartition, OffsetAndTimestamp> getOffsetAndTimestampAtTime(String kafkaServer, String topic, long time) {
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
    kafkaParams.put(GROUP_ID_CONFIG, "consumerGroupId");
    kafkaParams.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(AUTO_OFFSET_RESET_CONFIG, "latest");
    kafkaParams.put(ENABLE_AUTO_COMMIT_CONFIG, false);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaParams);

    List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);

    List<TopicPartition> topicPartitions = partitionInfos
            .stream()
            .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
            .collect(Collectors.toList());

    Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream()
            .collect(Collectors.toMap(tp -> tp, tp -> time));

    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(topicPartitionToTimestampMap);
    consumer.close();
    return result;
}

Kafka 1.10 does support timestamps, although it will still be a little challenge to use it to do what you want to do. But if you know but from which timestamp you want to read, and until you want to read, then you can just poll messages till that time, and stop consuming.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top