We are upgrading from Spring Boot 2 / spring-kafka 2.8.4 to Spring Boot 3 / spring-kafka 3.1.2 And have had to transition from SeekToCurrentErrorHandler
to CommonErrorHandler
. Our original attempt at this was to replace the method that created the SeekToCurrentErrorHandler
with a method that created a DefaultErrorHandler
that calls the constructor that takes a ConsumerRecordRecoverer
and a BackOff
. The BiFunction of the ConsumerRecordRecoverer
never is invoked.
What are we doing wrong on this upgrade?
We have tried just create a DefaultErrorHandler bean with this setup with the same results.
Here is our setup all in a @Configuration
class with @EnableKafka
annotation:
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> consumerFactory,
KafkaTemplate<Object, Object> kafkaTemplate) {
final var factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
factory.setCommonErrorHandler(
feedsErrorHandler(kafkaTemplate, retryProperties.getMainTopic()));
return factory;
}
private DefaultErrorHandler feedsErrorHandler(
KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate, RetrySettings retrySettings) {
final var backOffWithMaxRetries = createBackOffWithMaxRetries(retrySettings);
return createCommonErrorHandler(kafkaTemplate, backOffWithMaxRetries);
}
private ExponentialBackOffWithMaxRetries createBackOffWithMaxRetries(
RetrySettings retrySettings) {
final var backOffWithMaxRetries =
new ExponentialBackOffWithMaxRetries(retrySettings.getMaxRetries());
backOffWithMaxRetries.setInitialInterval(retrySettings.getInitialInterval());
backOffWithMaxRetries.setMultiplier(retrySettings.getMultiplier());
backOffWithMaxRetries.setMaxInterval(retrySettings.getMaxInterval());
return backOffWithMaxRetries;
}
private DefaultErrorHandler createCommonErrorHandler(
KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate,
ExponentialBackOffWithMaxRetries backOffWithMaxRetries) {
final var defaultErrorHandler =
new DefaultErrorHandler(
deadLetterPublishingRecoverer(kafkaTemplate), backOffWithMaxRetries);
// Setup Retryable Exceptions here. And add them to the classifications map.
// All other exceptions default to non-retryable.
final var classified = new HashMap<Class<? extends Throwable>, Boolean>();
retryProperties.getRetryableExceptions().forEach(aClass -> classified.put(aClass, true));
defaultErrorHandler.setClassifications(classified, false);
defaultErrorHandler.setCommitRecovered(true);
return defaultErrorHandler;
}
private DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate) {
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(consumerRecord, e) -> {
log.error("Error while processing [{}]", consumerRecord, e);
final var topic = determineTopic(consumerRecord, e.getCause());
log.error("Moving [{}] to [{}]", consumerRecord, topic);
return new TopicPartition(topic, -1);
});
}
What are we doing wrong on this upgrade?
We have tried just create a DefaultErrorHandler bean with this setup with the same results.
corstad is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.