Spring boot kafka consumer gets RejectedExecutionException: ExecutorService in shutdown state
when shutdown is triggered.
I tried to use spring.kafka.listener.immediate-stop: true
configuration to stop consumer immediately but still getting same error.
Is there a way to let async method accept tasks till kafka consumer processes all messages polled during shutdown?
Here is a sample code:
@Service
public class AsyncService {
@Async
void testAsync() {
System.out.println("Async task is working");
}
}
@Slf4j
@Component
@AllArgsConstructor
public class KafkaConsumer {
protected final AsyncService asyncService;
@Bean
public Consumer<TestEvent> flightEventConsumer() {
return msg -> {
log.info(msg.toString());
};
}
@Bean
public Consumer<Message<TestMsgEvent>> dailyConsumer() {
return msg -> {
log.info(">>><<<: " + msg.toString());
TestMsgEvent testMsgEvent = msg.getPayload();
try {
Thread.sleep(100);
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
asyncService.testAsync();
} catch (Exception e) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
String printExceptionAsString
= sw.toString();
System.out.println(":::::" + printExceptionAsString);
}
};
}
}