I have the following code which attempts to process a kafka message, and if it fails, logs/publish it to another topic for later processing.
@Service
class KafkaConsumerService(
private val kafkaService: KafkaService
) {
@KafkaListener(topics = ["my-topic"], groupId = "my-group")
fun listenToTransactionTopic(msg: ConsumerRecord<String, DynamicMessage>, ack: Acknowledgment) = runBlocking {
val message: ProtoMessage = ProtoMessage.parseFrom(msg.value().toByteArray())
try {
kafkaService.process(message)
} catch(e:Exception){
// publish to another topic
}
finally {
ack.acknowledge()
}
}
}
@Service
class KafkaService(
private val myentityService: MyEntityService,
private val kafkaMapper: KafkaMapper.
) {
companion object {
private val logger = LoggerFactory.getLogger(KafkaService::class.java)
}
@Transactional("transactionManager")
suspend fun process(message: ProtoMessage) {
val myEntity = myentityService.saveEntity(
kafkaMapper.map(message)
)
// save other var to db by calling other suspend functions
// produce another message
withContext(Dispatchers.IO) {
// write a message to a toic
}
}
}
class KafkaMapper(
private val clientService: clientService,
) {
companion object {
private val logger = LoggerFactory.getLogger(KafkaMapper::class.java)
}
suspend fun map(message: ProtoMessage): MyEntity = coroutineScope {
// expecting a list back
val remoteCall = clientService.someRemoteCall().awaitSingle()
// this uses webApi ==> webClientBuilder.baseUrl(url).filter(oauth2Filter).observationRegistry(observationRegistry).build()
if (remoteCall.size =! 1) {
logger.error("Failed :(")
throw IllegalStateException(error) // this causes inifinte loop if thrown enough times
}
return remoteCall.first()
}
}
The problem started to arise when I used reactive client to call my other webservices and if I throw any exception from within the mapper function:
java.lang.StackOverflowError: null
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.currentContext(MonoFlatMap.java:270)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.currentContext(MonoFlatMap.java:270)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.currentContext(MonoFlatMap.java:270)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.currentContext(MonoFlatMap.java:270)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.currentContext(MonoFlatMap.java:270)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.currentContext(MonoFlatMap.java:270)
at reactor.core.publisher.InnerOperator.currentContext(InnerOperator.java:33)
at reactor.core.publisher.InnerOperator.currentContext ... 1000+ lines
Also my kafka transactions started to fail when I used reactive clients. What is causing the reactive infinite loop?