I need to endlessly retry any exception that might happen during consuming retry topic (even like deserialization errors, as in our case it might be due to temporary issues with schema registry) up until it successfully be handled, accepting the fact that specific partition will be blocked till the message will be processed. Messages are consumed in the following way:
@KafkaListener(topics = RETRY_EVENT_TOPIC, containerFactory = "customEventFactory")
public void handleRetryEvent(ConsumerRecord<String, CustomEvent> record) {
...
}
As I see, there are the following two possible approaches (I guess other approaches are present as well, e.g. previously it was an option to use SeekToCurrentErrorHandler
to achieve similar stuff, but it’s already deprecated):
Approach #1. Use endless backoff retry for DefaultErrorHandler
without specifying recoverer:
var errorHandler = new DefaultErrorHandler(new ExponentialBackOff());
errorHandler.setClassifications(Collections.singletonMap(Exception.class, true), true);
var factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(errorHandler);
Approach #2. Implement custom ConsumerRecordRecoverer
that just log and throws exception and use it for DefaultErrorHandler
with fixed number of retries (as opposite to approach #1 with endless backoff retries):
public class ThrowingConsumerRecordRecoverer implements ConsumerRecordRecoverer {
@Override
public void accept(ConsumerRecord<?, ?> consumerRecord, Exception ex) {
log.info("Exhausted recoverer");
throw new RuntimeException(ex);
}
}
var errorHandler = new DefaultErrorHandler(new ThrowingConsumerRecordRecoverer(), new FixedBackOff(100, 5));
errorHandler.setClassifications(Collections.singletonMap(Exception.class, true), true);
var factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(errorHandler);
With approach #2, retry exhausted recoverer will be executed endlessly after each exhausted fixed retries, up until successful execution. In my understanding, if ConsumerRecordRecoverer
implementation does not throw exception, we commit offset and moves forward, but in case of throwing exception – we stays on the same offset and retry starts again.
What it should be considered by selecting one or another approach? Does they equivalent, or there are some concerns that should be considered? By using approach #1, we endlessly retry in-memory (up until application will be shut down), and main concern whether we should periodically give up with recoverer or it’s not necessary?