I need to read messages in given timerange from kafka topic. I have read docs, which says we can use offsetsForTimes
to get offset for a msg with earliest timestamp >= inputTimeStamp, and can start reading from there.
Used below implementation based on my understanding
<code>private void readInRange(long startTime, long endTime)
List<PartitionInfo> partitionInfos = consumer.partitionsFor(failedOrderTopic);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
Map<TopicPartition, Long> partitionTimestampMap = new HashMap<>();
for (TopicPartition topicPartition : topicPartitions) {
partitionTimestampMap.put(topicPartition, startTime);
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(partitionTimestampMap);
for (TopicPartition topicPartition : topicPartitions) {
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, partitionOffsetMap.get(topicPartition).offset());
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(2000));
if (Objects.isNull(consumerRecords) && consumerRecords.isEmpty()) {
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (consumerRecord.timestamp() > endTime) {
public KafkaConsumer<String, String> kafkaConsumer() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroupId");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<>(properties);
<code>private void readInRange(long startTime, long endTime)
{
List<PartitionInfo> partitionInfos = consumer.partitionsFor(failedOrderTopic);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
Map<TopicPartition, Long> partitionTimestampMap = new HashMap<>();
for (TopicPartition topicPartition : topicPartitions) {
partitionTimestampMap.put(topicPartition, startTime);
}
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(partitionTimestampMap);
for (TopicPartition topicPartition : topicPartitions) {
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, partitionOffsetMap.get(topicPartition).offset());
boolean running = true;
while (running) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(2000));
if (Objects.isNull(consumerRecords) && consumerRecords.isEmpty()) {
break;
}
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (consumerRecord.timestamp() > endTime) {
running = false;
break;
}
// processing of records
}
}
}
}
@Bean
public KafkaConsumer<String, String> kafkaConsumer() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroupId");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<>(properties);
}
</code>
private void readInRange(long startTime, long endTime)
{
List<PartitionInfo> partitionInfos = consumer.partitionsFor(failedOrderTopic);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
Map<TopicPartition, Long> partitionTimestampMap = new HashMap<>();
for (TopicPartition topicPartition : topicPartitions) {
partitionTimestampMap.put(topicPartition, startTime);
}
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(partitionTimestampMap);
for (TopicPartition topicPartition : topicPartitions) {
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, partitionOffsetMap.get(topicPartition).offset());
boolean running = true;
while (running) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(2000));
if (Objects.isNull(consumerRecords) && consumerRecords.isEmpty()) {
break;
}
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (consumerRecord.timestamp() > endTime) {
running = false;
break;
}
// processing of records
}
}
}
}
@Bean
public KafkaConsumer<String, String> kafkaConsumer() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroupId");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<>(properties);
}
Approach :
- traverse each partition of topic
- find start offset to read from.
- keep reading and exist once recordTimeStamp > endTime
Processing time of each records is in ms, you can say consumer.poll
calls takes more time than processing records.
We have 4 partitions of the topic, but still we are consuming as if we have single partition because we have only one KafkaConsumer, hence not able to get scaling benefit of consuming from multiple partitions at once.
How can we improve this such that multiple consumers(=numPartitions) can spin up and keep polling parallelly from all partitions at once & process records independently, once all records from each partitions is consumed, we can club results & process further. Is there a way to do this in apache-kafka library.