We have a Spring Boot 3.3.0 + Spring Cloud Streams + RabbitMQ binder and are strugling with proper configuration of both partitioned producer and concurrent consumers.
Sample app we use:
@SpringBootApplication
public class DemoApplication {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(1);
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Supplier<Message<Integer>> randomInt() {
return () -> {
Integer currentValue = ATOMIC_INTEGER.getAndIncrement();
System.out.println("Generated: " + currentValue);
return MessageBuilder.withPayload(currentValue)
.setHeader("partitionKey", currentValue)
.build();
};
}
@Bean
public Consumer<Message<Integer>> intConsumer() {
return (Message<Integer> input) -> System.out.println("Received: " + input.getPayload());
}
@Bean
public EnvironmentBuilderCustomizer environmentBuilderCustomizer(RabbitProperties rabbitProperties) {
RabbitProperties.Stream streamProperties = rabbitProperties.getStream();
return builder -> builder
.addressResolver(add -> new Address(streamProperties.getHost(), streamProperties.getPort()));
}
@Bean(name = "customPartitionKeyExtractorStrategy")
public PartitionKeyExtractorStrategy customPartitionKeyExtractorStrategy() {
return (Message<?> message) -> message.getHeaders().getId();
}
}
and application.yml:
spring:
cloud:
function:
definition: randomInt;intConsumer
stream:
function:
bindings:
intConsumer-in-0: testconsumer
randomInt-out-0: testproducer
bindings:
testconsumer:
destination: super-smc
group: smc-group
consumer:
instance-count: 2
instance-index: 0
concurrency: 2
partitioned: true
testproducer:
destination: super-smc
producer:
partition-count: 4
partition-key-extractor-name: customPartitionKeyExtractorStrategy
rabbit:
bindings:
testconsumer:
consumer:
container-type: stream
super-stream: true
testproducer:
producer:
producer-type: stream_async
super-stream: true
declare-exchange: false
rabbitmq:
host: <..>
port: 5672
username: <..>
password: <..>
stream:
host: <..>
port: 5552
username: <..>
password: <..>
We have two problems:
- consumers are not consuming from the stream
- producer does not use one of the partitions of the super stream (writes only to the first three partitions and skips the last one)
What are we missing or doing wrong?
We examined the current Spring Cloud Streams docs and Spring Cloud Stream RabbitMQ Binder reference guide but did not find the solution.
New contributor
Kostyantyn Panchenko is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.