I’m trying to understand at-least-once semantics in Kafka using Java implementation KafkaConsumer
and all possible cases when duplicate messages may occur.
Simple test setup:
one topic – foo
one partition in topic foo
one consumer in group.id – test
For simplicity example code from javadocs – section Manual Offset Control.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("max.poll.records", "50"); // !!!!!!!
props.setProperty("auto.offset.reset", "earliest"); // start from beginning or from last commited offset
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo")); // only one topic
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
Known results:
Consumer connects to broker, after successful rebalance when group becomes STABLE, consumer starts to consume messages from the beginning of partition. Let’s assume that the first four poll()
was successful so that the buffer if now full and the first 200 messages was stored in DB. Then something bad happened and application crashed or just throwed an exception, before it was able to commit offset in broker.
1 case:
After application restart, this consumer will check last committed offset in broker, will not find any so it will start from beginning according to auto.offset.reset=earliest
, as a result the same duplicated 200 messages would be processed and saved to DB.
2 case:
Assume that this setup changes a bit and now we have 2 consumers in the same group, then the same crash or exception occurs in consuming consumer, before it was able to commit, then rebalance is performed and second consumer begins to consume. Again it will ask broker for the last committed offset and will start from the beginning, as a result the same duplicated 200 messages would be processed and saved to DB.
To sum up:
Consumer always goes back to last committed offset. If consumer will process messages, but will fail before committing them, then this messages will be reprocessed again by another consumer.
- I’m wondering if there is any other possible case when duplicates may occur?
- Is there any possibility that buffer will contain duplicates?
TLDR;
Is there any chance that for example 2 subsequent poll()
may return duplicated messages without throwing any exception? Is this case even possible?
consumer.poll() //returns messages 0-50
// no exception, no crash, normal processing
consumer.poll() //returns messages 25-75
I know that consumer internally (in memory) maintains offset.position
. Based on it next fetch is requested from broker, and based on it consumer commits messages in broker. Is this offset.position
always sequential in terms of one single healthy consumer?
I assume that even if during poll()
internally something bad happened then exception would be always thrown or if poll()
can handle this problem then it will always return next batch of messages without overlapping with previous poll()
. So that while(true)
loop will always break in case of any problem with poll()
otherwise will always process messages in sequential manner (sequential offsets).
3