I’m trying to check if my Kafka listener fails to connect to Kafka. I need to make a REST call or any other action, but the problem is the Kafka listener keeps trying to reconnect and it will never end. Do you have any suggestions? Below is what I have achieved so far but not working.
fun fallback(
@Payload msg: obj,
throwable: Throwable
) {
if (throwable is DisconnectException) {
logger.error { "Kafka DisconnectException: ${throwable.message}" }
} else {
logger.error { "Error in KafkaListenerService: $msg, Exception: ${throwable.message}" }
}
}
@EventListener
fun listen(event: ListenerContainerIdleEvent) {
println(event)
try {
logger.info("ListenerContainerIdleEvent: ${event.source}")
logger.info("ListenerContainerIdleEvent: ${event.consumer.assignment()}")
println("ERR::::::::::::::::"+event.consumer.assignment())
println(event.consumer.endOffsets(event.consumer.assignment(), Duration.ofSeconds(5)))
} catch (e: Exception) {
e.printStackTrace()
}
}