I am using spring boot 3.1.10
Below is my kafka consumer code
@RetryableTopic(attempts = "10",
backoff = @Backoff(delayExpression = "#{#currentAttempt < 3 ? 60000 : (900000 * (2^(#currentAttempt - 3)))}", maxDelayExpression = "86400000"),
autoCreateTopics = "true",
retryTopicSuffix = "-retry",
dltTopicSuffix = "-dlt",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltStrategy = DltStrategy.FAIL_ON_ERROR,
timeout = "7200000",
kafkaTemplate = "domainEventProducerKafkaTemplate")
@KafkaListener(topics = "${spring.kafka.consumer.audit-event-topic}",
containerFactory = "auditEventKafkaListenerContainerFactory")
public void consumeCoreEvent(ConsumerRecord<String, AuditEvent> consumerRecord,
@Header(name = RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultValue = "1") Integer deliveryAttempt,
@Header(name= KafkaHeaders.RECEIVED_TOPIC) String topicName,
@Header(name = KafkaHeaders.GROUP_ID)String groupId) {
throw new RunTimeException("Testing")
}
I am using non blocking retry. My requirement is, first 3 retries should be completed with in 10 minutes itself. Next 7 retries should not go beyond 24 hours.
I tried above code. But its not working as expected. The message is sent to first retry topic. But its not consumed
I tried this expression
#{ #currentAttempt * 1000}
But it throwing exception.
EL1030E: The operator 'MULTIPLY' is not supported between objects of type 'null' and 'java.lang.Integer'
Tried custom policy as well. But no luck
import org.springframework.retry.backoff.BackOffContext;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.BackOffInterruptedException;
import org.springframework.retry.RetryContext;
public class CustomBackOffPolicy implements BackOffPolicy {
@Override
public BackOffContext start(RetryContext retryContext) {
return new CustomBackOffContext();
}
@Override
public void backOff(BackOffContext backOffContext) throws BackOffInterruptedException {
CustomBackOffContext context = (CustomBackOffContext) backOffContext;
int retryCount = context.getRetryCount();
// Increment retry count
context.incrementRetryCount();
// Define backoff strategy based on retry count
long backOffPeriod;
if (retryCount < 3) {
// First 3 retries within 5 minutes (300,000 ms)
backOffPeriod = 100000; // ~1.67 minutes per retry
} else if (retryCount < 10) {
// Next 7 retries spread over 24 hours (86,400,000 ms)
backOffPeriod = (long) Math.pow(2, retryCount - 3) * 60000; // Exponential backoff starting from 1 minute
backOffPeriod = Math.min(backOffPeriod, 86400000L); // Cap at 24 hours
} else {
// Default backoff for any additional retries (if any)
backOffPeriod = 86400000L; // 24 hours
}
try {
Thread.sleep(backOffPeriod);
} catch (InterruptedException e) {
throw new BackOffInterruptedException("Thread interrupted while backing off", e);
}
}
private static class CustomBackOffContext implements BackOffContext {
private int retryCount = 0;
public int getRetryCount() {
return retryCount;
}
public void incrementRetryCount() {
this.retryCount++;
}
}
}
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder
.stateless()
.retryPolicy(new SimpleRetryPolicy(10)) // Total 10 retries
.backOffPolicy(new CustomBackOffPolicy())
.build();
}
Can some one please help me how to make sure the retries working exponential as per my requirement
Any help will be greatly appreciated!!!