I set up message sending with exponential retries via kafka with spring-kafka (3.0.13). My main topics are created via KafkaAdmin using TopicBuilder:
<code>@PostConstruct
fun initTopics() {
topics.map {
TopicBuilder
.name(it)
.replicas(replicationFactor)
.partitions(partition)
.configs(mapOf(
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG to minInsyncReplicas.toString(),
TopicConfig.RETENTION_MS_CONFIG to retentionMs.toString()
))
.build()
}.apply {
kafkaAdmin.createOrModifyTopics(*toTypedArray())
}
}
</code>
<code>@PostConstruct
fun initTopics() {
topics.map {
TopicBuilder
.name(it)
.replicas(replicationFactor)
.partitions(partition)
.configs(mapOf(
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG to minInsyncReplicas.toString(),
TopicConfig.RETENTION_MS_CONFIG to retentionMs.toString()
))
.build()
}.apply {
kafkaAdmin.createOrModifyTopics(*toTypedArray())
}
}
</code>
@PostConstruct
fun initTopics() {
topics.map {
TopicBuilder
.name(it)
.replicas(replicationFactor)
.partitions(partition)
.configs(mapOf(
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG to minInsyncReplicas.toString(),
TopicConfig.RETENTION_MS_CONFIG to retentionMs.toString()
))
.build()
}.apply {
kafkaAdmin.createOrModifyTopics(*toTypedArray())
}
}
retry topics are created by the retry configuration:
<code>@Bean
fun retryTopicConfiguration() = RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopics(true, partition, replicationFactor.toShort())
.maxAttempts(attempts)
.exponentialBackoff(initialInterval, multiplier, maxInterval)
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
.includeTopics(topics)
.retryOn(KafkaRetryableException::class.java)
.dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
.dltHandlerMethod(EndpointHandlerMethod(KafkaDltHandler::class.java, "dlt"))
.create(kafkaTemplate)
</code>
<code>@Bean
fun retryTopicConfiguration() = RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopics(true, partition, replicationFactor.toShort())
.maxAttempts(attempts)
.exponentialBackoff(initialInterval, multiplier, maxInterval)
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
.includeTopics(topics)
.retryOn(KafkaRetryableException::class.java)
.dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
.dltHandlerMethod(EndpointHandlerMethod(KafkaDltHandler::class.java, "dlt"))
.create(kafkaTemplate)
</code>
@Bean
fun retryTopicConfiguration() = RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopics(true, partition, replicationFactor.toShort())
.maxAttempts(attempts)
.exponentialBackoff(initialInterval, multiplier, maxInterval)
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
.includeTopics(topics)
.retryOn(KafkaRetryableException::class.java)
.dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
.dltHandlerMethod(EndpointHandlerMethod(KafkaDltHandler::class.java, "dlt"))
.create(kafkaTemplate)
The problem is that I cannot add a config for the MIN_IN_SYNC_REPLICAS_CONFIG and RETENTION_MS_CONFIG properties as for the creation of the main topics. Consequently, the retry and dlt topics are not configured like the main topics. I don’t understand why it’s not possible to add a config with RetryTopicConfigurationBuilder so I’m wondering if I’m missing something?