I am facing timeout exception due to various reasons with my Spring Kafka consumer. I am using the following configs to define my consumer factory with error handler.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> createListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(createErrorHandler());
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100);
return new DefaultKafkaConsumerFactory<>(props);
}
public DefaultErrorHandler createErrorHandler() {
BackOff fixedBackOff = new FixedBackOff(3000, 0);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
log.info("Exception occured: {}", exception);
}, fixedBackOff);
errorHandler.addNotRetryableExceptions(UnknownTopicOrPartitionException.class,
OffsetOutOfRangeException.class, NetworkException.class,
SerializationException.class, DeserializationException.class);
return errorHandler;
}
However, despite having an error handler where I want to handle timeout effectively by sending email or whatever, in my logs I am receiving this error.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Questions:
-
Is TimeoutException a retry exception? Is it safe to retry them?
-
Is it possible to handle the above specified exceptions using error handler?
-
If yes, how to handle these exceptions using error handler (SeekToCurrentErrorHandler or DefaultErrorHandler) efficiently?
-
Why Kafka defines certain exception as Warning instead of Error like the following
2024-06-25T21:42:31.693+05:30 WARN 22880 --- [retry-kafka] [ test-id-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Error while fetching metadata with correlation id 263 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION}
I found a similar question here, but that is part of Kafka producer, my expectation is to handle these exceptions in error handler -
Is it possible to have error handler for producer to retry sending the message on certain exceptions?
I tried defining the exceptions in error handler but TimeoutException or UnknownTopicOrPartitionException are not sent to error handler instead they are just logging in my console.
I am using Java 19, Kafka 3.7.0, Spring-Kafka 3.1.5