I am using node-rdKafka and trying to see if I can reset offsets of all the partitions for the topic consumed by the consumer.
We are now doing testing for Kafka consumer and we want to skip messages of certain topic to keep different topics in sync. I know we can do something like setting the auto.offset.reset to “latest” but that only works if no commits have been done for the group but we are in the middle now. I read the document and see I maybe able to do something like this – grab the metadata and found our topic of interest and its associated partitions, then loop through the partitions to do a consumer.queryWatermarkOffsets to get the current low and high offsets, then commit the high offset for all the partitions. Unfortunately, although I can the request to commit the offsets, it doesn’t seem to change anything. I still see the messages behinds remain the same after that. Maybe I am doing something wrong? The following is the piece of code I have been using for the trick:
In my consumer, inside the on-ready event handler, I have the following code:
const opts = {topic: this.consumerTopicName, timeout: 10000};
consumer.getMetadata(opts, (err, metadata) => {
if(err) {
logger.err('Error getting metadata');
} else {
const topics = metadata.topics.filter((topic) => topic.name == this.consumerTopicName);
logger.info("NumOfTopics: " + topics.length);
let numOfPartitions = 0;
if(topics.length > 0) {
numOfPartitions = topics[0].partitions.length;
}
logger.info("numOfPartitions: " + numOfPartitions);
if(numOfPartitions > 0) {
for(let p = 0; p < numOfPartitions; p++) {
consumer.queryWatermarkOffsets(this.consumerTopicName, p, 5000, (err, offsets) => {
if(!err) {
const high = offsets.highOffset;
const low = offsets.lowOffset;
logger.info("Committing offset: " + high + " for partition " + p);
consumer.commit({topic: this.consumerTopicName, partition: p, offset: high });
}
});
}
}
}
})
}