I’m using Apache Kafka(aws msk) with the IBM Sarama Go library to manage a Kafka consumer group. I’ve encountered an issue where my consumers stop consuming messages under certain conditions, even though they still seem to fetch topic metadata.
Issue Details:
When a consumer remains idle for too long (without consuming messages), it stop’s consuming messages altogether. I’m not sure whether a rebalance is occurring or not, but from the logs, it seems like the consumer is able to fetch the metadata of the topics but just doesn’t consume any messages.
According to my observation this behavior occurs in the following cases:
Case 1: Controller Shift Between Brokers
When the active controller shifts from one broker to another within the same Kafka cluster, all of my microservices stop consuming messages. However, they continue to fetch the metadata for the topics. The consumers don’t recover automatically and remain idle until I intervene.
Case 2: Idle Topic with No Messages for Extended Periods
When a topic has no messages for an extended period (e.g., 5 to 7 days), the consumers stop consuming new messages even when new data is added to the topic later. The consumers still fetch topic metadata but do not process the messages.
Temporary Solution:
The only way I’ve found to resolve this issue is to manually redeploy the microservices or restart the consumer pods. After doing this, the consumers pick up all the messages and process them correctly.
My Questions:
Why are my consumers stopping after the active controller shifts or when the topic has been idle for an extended period?
How can I prevent consumers from stopping, and ensure they resume consuming messages even after long idle periods or broker changes?
Are there specific configurations or properties in the IBM Sarama package that could address this issue (e.g., auto-reconnect or better handling of broker changes)?
Additional Information:
Kafka version: 3.5.1
Using IBM Sarama Go library for consumers.
func CreateConsumerGroup(brokers []string, group string) (sarama.ConsumerGroup, error) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.ClientID = fmt.Sprintf("%s-%d", group, time.Now().UnixMilli())
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Metadata.RefreshFrequency = 5 * time.Minute
config.Consumer.Group.Heartbeat.Interval = 2 * time.Second
config.Consumer.Group.Session.Timeout = 30 * time.Second
sarama.Logger = log.New(os.Stdout, "[Sarama-Kafka] ", log.LstdFlags)
return sarama.NewConsumerGroup(brokers, group, config)
}
I have tried several approaches to diagnose and resolve this issue:
Restarting the microservices: Whenever the consumers stop consuming messages (either after a controller shift or prolonged idleness), I manually restart the microservices or kill the pods. After doing this, the consumers resume and process all the queued messages as expected. However, I was hoping to avoid having to do this manually every time.
Kafka configuration adjustments: I’ve experimented with various Kafka configurations related to session timeouts, heartbeat intervals, and rebalance strategies to ensure the consumers don’t get released from the consumer group prematurely. Despite these changes, the consumers still stop processing messages after the controller shift or idleness.
Monitoring logs: I have been closely monitoring the consumer logs, which indicate that the consumers are still able to fetch metadata from the Kafka brokers, even though they don’t actually consume any messages. This led me to believe the issue might not be network-related, but rather a problem with how the consumer behaves after a period of idleness or a controller change.
What I was expecting:
I expected the consumers to automatically reconnect to the brokers and resume consuming messages without manual intervention, especially after a controller shift or an extended period of inactivity.
I expected the consumer group to handle rebalancing gracefully, ensuring that consumers remain active and continue consuming messages even after the Kafka cluster undergoes changes.