we try to use the great Functions with multiple input feature described here
We are using Avro for dealing with our Kafka topics and it seems that the binder deserialization configuration is not taken when using multiple inputs for a binder (but it works perfectly when using a unique input channel)
The use-case : we need to consume 4 differents topics that used the same Avro schema for the message payload (fr.my.avro.model.v2.ResultEvent)
We try many configuration but all the tries causes the same processing error when consuming:
[class org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$OriginalMessageHolder cannot be cast to class fr.my.avro.model.v2.ResultEvent (org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$OriginalMessageHolder and fr.my.avro.model.v2.ResultEvent are in unnamed module of loader 'app')] with context [org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$OriginalMessageHolder@2e039fc3]
Here is how we try to use it:
The Function implementation (2 inputs channel, one output):
@Bean("integration-issues")
public Function<Tuple2<Flux<ResultEvent>, Flux<ResultEvent>>, Flux<IntegrationIssues>> process(){
// public Function<Flux<StepResultEvent>, Flux<StepIntegrationIssues>> process() {
return tuple -> {
Flux<ResultEvent> stepResultEvent1 = tuple.getT1();
Flux<ResultEvent> stepResultEvent2 = tuple.getT2();
Flux<ResultEvent> mergedFlux = Flux.merge(resultEvent1, resultEvent2);
return mergedFlux
.doOnNext(event -> log.info("Processing INTEGRATION ISSUES " + event.getId()))
.map(IntegrationIssuesBuilder::buildResponse)
.onErrorContinue((throwable, o) -> {
log.info("Error while processing INTEGRATION ISSUES due to error [{}] with context [{}]", throwable.getMessage(), o);
log.info("Skipping the request");
});
};
}
The binder configuration (try #1 with list of topics, comma separated):
spring.cloud.stream.bindings.integration-issues-in-0.binder=kafka
spring.cloud.stream.bindings.integration-issues-in-0.group=integration-issue
spring.cloud.stream.bindings.integration-issues-in-0.consumer.use-native-decoding=true
spring.cloud.stream.bindings.integration-issues-in-0.destination=TOPIC-1,TOPIC2
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.auto-commit-on-error=true
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.start-offset=latest
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.configuration.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.configuration.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.bindings.integration-issues-0.consumer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.configuration.auto.offset.reset=latest
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.configuration.specific.avro.reader=true
The binder configuration (try #2 with specific binder in config):
spring.cloud.stream.bindings.integration-issues-in-0.binder=kafka
spring.cloud.stream.bindings.integration-issues-in-1.binder=kafka
spring.cloud.stream.bindings.integration-issues-in-0.group=integration-issue-1
spring.cloud.stream.bindings.integration-issues-in-1.group=integration-issue-2
spring.cloud.stream.bindings.integration-issues-in-0.consumer.use-native-decoding=true
spring.cloud.stream.bindings.integration-issues-in-1.consumer.use-native-decoding=true
spring.cloud.stream.bindings.integration-issues-in-0.destination=TOPIC-1
spring.cloud.stream.bindings.integration-issues-in-1.destination=TOPIC-2
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.auto-commit-on-error=true
spring.cloud.stream.kafka.bindings.integration-issues-in-1.consumer.auto-commit-on-error=true
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.start-offset=latest
spring.cloud.stream.kafka.bindings.integration-issues-in-1.consumer.start-offset=latest
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.configuration.spring.deserializer.value
spring.cloud.stream.kafka.bindings.integration-issues-in-1.consumer.configuration.spring.deserializer.value
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.configuration.value.deserializer=org.sp
spring.cloud.stream.kafka.bindings.integration-issues-in-1.consumer.configuration.value.deserializer=org.sp
spring.cloud.stream.kafka.bindings.integration-issues-0.consumer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.integration-issues-1.consumer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.configuration.auto.offset.reset=latest
spring.cloud.stream.kafka.bindings.integration-issues-in-1.consumer.configuration.auto.offset.reset=latest
spring.cloud.stream.kafka.bindings.integration-issues-in-0.consumer.configuration.specific.avro.reader=true
spring.cloud.stream.kafka.bindings.integration-issues-in-1.consumer.configuration.specific.avro.reader=true
We also try adding this content-type
configuration:
spring.cloud.stream.bindings.step-integration-issues-in-0.content-type=application/*+avro
Note that working with a single input works as expected (the deserialization is done correctly)
Is this use-case with avro on multiple input bindings currently supported ?
We miss something on the configuration ?
Thanks for your help and answers