My application polls records from a MSK Kafka cluster. The application maintains offset of each partition and hence has disabled autocommit. It actually never commits offset as it persists offset to internal data store.
In production, we observed duplicate records. The duplicate records don’t stop until we restart the instance with zombie consumer (evicted from group but keeps polling). A consumer turns to zombie when it fails to send heartbeats. This typically happens due to IAM authentication issues in MSK which sometime lasts for longer time.
On further digging, I found that the partitions that are assigned to zombie consumer are assigned to other active consumers, but the zombie consumers poll continue to return the records.
The question is – should a zombie consumer get records on poll?
I have been able to reproduce it locally. Here is my local setup with issue reproduced:
- A single broker (docker image) with three different external port.
- Create one topic with two partitions
- Toxiproxy proxies these port.
- Two consumers (subscribed to the topic) each connecting one of the proxied port with session timeout set to 10 seconds
- Introduce latency in one of the proxy to evict one of the consumer from the group
- Produce some messages for each partition
- Both the consumer keep getting the messages
Kafka broker
docker run -d --name kafka-container
-e KAFKA_ENABLE_KRAFT=yes
-e KAFKA_CFG_NODE_ID=1
-e KAFKA_CFG_PROCESS_ROLES=broker,controller
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
-e KAFKA_BROKER_ID=1
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9094
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT,EXTERNAL1:PLAINTEXT,EXTERNAL2:PLAINTEXT
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,EXTERNAL://:9093,EXTERNAL1://:29093,EXTERNAL2://:39093,CONTROLLER://:9094
-e KAFKA_CFG_ADVERTISED_LISTENERS=EXTERNAL://localhost:9092,EXTERNAL1://localhost:29092,EXTERNAL2://localhost:39092,PLAINTEXT://$CONTAINER_NAME:9092
-e ALLOW_PLAINTEXT_LISTENER=yes
-e KAFKA_KRAFT_CLUSTER_ID=5e-ZWJELQjCqW2n3bwMTWg
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
-p "9092:9093"
-p "29093:29093"
-p "39093:39093"
bitnami/kafka:latest
Toxiproxy proxies
toxiproxy-cli create -l localhost:29092 -u localhost:29093 zombie_consumer_scenario_healthy_proxy
toxiproxy-cli create -l localhost:39092 -u localhost:39093 zombie_consumer_scenario_unhealthy_proxy
Kafka Console consumer
kafka-console-consumer --group test-cg --topic test --consumer-property enable.auto.commit=false --consumer-property session.timeout.ms=10000 --bootstrap-server localhost:29092
kafka-console-consumer --group test-cg --topic test --consumer-property enable.auto.commit=false --consumer-property session.timeout.ms=10000 --bootstrap-server localhost:39092
Latency Toxic
toxiproxy-cli toxic add -t latency -n myToxic -a latency=10000 -a jitter=50 zombie_consumer_scenario_unhealthy_proxy
As a solution, I am thinking of taking control of partition assignment in my application. However, I believe kafka consumer should not poll should not return records after eviction from the group.
5