@Bean
public RecordMessageConverter converter() {
// return new JsonMessageConverter();
JsonMessageConverter jsonMessageConverter = new JsonMessageConverter();
jsonMessageConverter.setHeaderMapper(new CustomKafkaHeaderMapper());
return jsonMessageConverter;
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
// return new BatchMessagingMessageConverter(converter());
BatchMessagingMessageConverter batchMessagingMessageConverter = new BatchMessagingMessageConverter(converter());
batchMessagingMessageConverter.setHeaderMapper(new CustomKafkaHeaderMapper());
return batchMessagingMessageConverter;
}
public class CustomKafkaHeaderMapper extends DefaultKafkaHeaderMapper {
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
super.toHeaders(source, target);
if (!target.containsKey("cushion_next_retry_attempt")) {
target.put("cushion_next_retry_attempt", 0);
}
}
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
super.fromHeaders(headers, target);
if (headers.containsKey("cushion_next_retry_attempt")) {
Integer nextRetryAttempt = headers.get("cushion_next_retry_attempt", Integer.class);
target.add("cushion_next_retry_attempt", new byte[] {nextRetryAttempt.byteValue()});
}
}
}
@KafkaListener(id = "${spring.kafka.listener.cancel-auth-linkage.id}",
topics = "${spring.kafka.listener.cancel-auth-linkage.topic.linkage}", autoStartup = "false",
batch = "true",
groupId = "cushion",
concurrency = "4")
@Transactional("kafkaTransactionManager")
public void listen(List<CancelAuthorizationLinkage> messages,
@Header(KafkaHeaders.RECEIVED_KEY) List<String> keys,
@Header(
value = "cushion_next_retry_attempt",
required = false
)
List<Integer> nextRetryAttempts) throws InvalidValueException {
for (int i = 0; i < messages.size(); i++) {
CancelAuthorizationLinkage message = messages.get(i);
String key = keys.get(i);
int nextRetryAttempt = nextRetryAttempts.get(i);
try {
processor.process(message);
} catch (Exception e) {
CompletableFuture<SendResult<String, CancelAuthorizationLinkage>> handle =
retryHandler.handle(message, listenerPropertiesService, e, nextRetryAttempt, key);
}
}
}
}
I’m using a Spring Kafka batch listener. Not all of my messages contain the next_retry_attempt
header. This header is added only when a message consumption fails and is re-sent to the topic, to track retry attempts and determine if further retries are needed. However, I find that the next_retry_attempt
header is always null in the listener. How can I configure it correctly so that it defaults to 0 if the header is missing?
2
You cannot extract custom headers for the batch like that:
@Header(
value = "cushion_next_retry_attempt",
required = false
)
List<Integer> nextRetryAttempts
You have to look into a dedicated header for that purpose:
KafkaHeaders.BATCH_CONVERTED_HEADERS as a List<Map<String, Object>>
See more info in docs: https://docs.spring.io/spring-kafka/reference/kafka/headers.html
With the batch converter, the converted headers are available in the
KafkaHeaders.BATCH_CONVERTED_HEADERS
as aList<Map<String, Object>>
where the map in a position of the list corresponds to the data position in the payload.
1