Using Spring Boot 3.3.6 and Spring Cloud 2023.0.3, this minimal code works great for consuming messages from a Kafka topic:
@SpringBootApplication
public class TestApplication {
@SuppressWarnings("resource")
public static void main(final String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@Bean(name = "receivePurple")
public Consumer<String> receivePurple() {
return msg -> System.out.println("received purple message: " + msg);
}
}
application.yml:
spring:
cloud:
function:
definition: receivePurple
stream:
bindings:
receivePurple-in-0:
binder: blueKafka
destination: purpleTopic
group: purpleGroup
binders:
blueKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9011
autoCreateTopics: false
autoAddPartitions: false
configuration:
allow.auto.create.topics: false
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
However, during the initial deployment, there was a typo:
key.deserializer: org.apache.kafka.common.serializatoin.StringDeserializer
that took many painstaking hours to troubleshoot, mostly because the error message didn’t give any indication of the underlying problem:
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (purpleTopic):
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:685) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
Caused by: java.lang.RuntimeException: Failed to obtain partition information for the topic purpleTopic
To avoid this troubleshooting headache in the future, I thought it would be easy to update the configuration to capture error messages, but I can’t find anything that gives me access to the error. The documentation talks about capturing binding errors:
spring.cloud.stream.bindings.receivePurple-in-0.error-handler-definition=myErrorHandler
spring.cloud.stream.default.error-handler-definition=myErrorHandler
but doesn’t have anything about binder errors.
What can be configured (logging, channel, interceptor, ??) to capture errors that happen during binder creation/initialization?