I have a Kafka consumer configured with setup like:
@KafkaListener(
topics = "test",
id = "test_id",
groupId = "one",
containerFactory = "defaultKafkaListenerContainerFactory",
autoStartup = "false"
)
public void listen(String message, Acknowledgment ack){
// Process the message
}
I am deploying this as a spring app in a distributed environment which has 5 instances running.
I have an use case where I need to Start or stop this consumer manually.
For this purpose i have implemented an API like:
@GetMapping("/start")
public void start(@RequestParam("id") String listenerId) {
kafkaListenerAutomation.startListener(listenerId);
}
which calls the following method:
// here listenerId is "test_id" which is coming from config in @KafkaListener
public boolean startListener(String listenerId) {
MessageListenerContainer listenerContainer =
kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
assert listenerContainer != null : false;
listenerContainer.start();
return true;
}
My question is,
This API request will go to any one instance running this APP and this code will be called in the instance in which the API request is received.
If I issue a stop() call like in the above manner, will it stop all the consumers in all the 5 instances in which the app is running?
Pls do note, the Group ID and ID and other configs under @KafkaListener will be the same for all the deployed instances.
Please clarify