I have a batch listener:
@KafkaListener(
id = "${spring.kafka.listener.cancel-auth-linkage.id}",
topics = "${spring.kafka.listener.cancel-auth-linkage.topic.linkage}",
autoStartup = "false",
batch = "true",
groupId = "cushion")
public void listen(List<Message<CancelAuthorizationLinkageResource>> messages) {
...
}
I need to handle ConversionException. If a ConversionException occurs, I want to send it to the Dead Letter Queue (DLQ). Therefore, I defined a DefaultErrorHandler and a DeadLetterPublishingRecoverer.
@Bean
public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<Object, Object> kafkaTemplate,
ListenerPropertiesServiceInterface listenerPropertiesServiceInterface) {
return new DeadLetterPublishingRecoverer(kafkaTemplate,
(r, e) -> {
if (e.getCause() instanceof ConversionException) {
if (r.key().equals(Objects.requireNonNull(((ConversionException) e.getCause()).getRecord())
.key())) {
return new TopicPartition(listenerPropertiesServiceInterface.getDlqTopic(), -1);
} else {
// Even though the non-failing messages are sent back to the original topic, they are not being consumed by the listener again.
return new TopicPartition(r.topic(), -1);
}
} else {
throw new RuntimeException(e);
}
});
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) {
return new DefaultErrorHandler(recoverer);
}
However, I noticed that if I fetch 10 messages at a time, and even if only one of them causes a ConversionException, my DeadLetterPublishingRecoverer processes all the messages (retrying the complete batch?). I want only the messages that caused the error to be sent to the DLQ or handled by the error handler. The messages without errors should still be processed normally by the listener.
How can I achieve this?