In the code below I expected the consumer to only receive one message with list of messages, but I’m actually getting the messages separately.
How can I turn on consumer batching to only receive a single message with a list of messages?
I used spring boot 3.2.9 and cloud 2023.0.3.
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Function<Message<List<String>>, String> route() {
return input -> {
System.out.println("processing: " + input);
return input.getPayload() + " response";
};
}
}
application.yaml:
spring:
application:
name: demo
cloud:
function:
definition: route
stream:
bindings:
route-out-0:
destination: out-topic
route-in-0:
destination: in-topic
group: group1
consumer:
batch-mode: true
in test dir:
application-local.yaml:
spring:
cloud:
stream:
bindings:
test-out-0:
destination: in-topic
@SpringBootTest
@EmbeddedKafka
@ActiveProfiles("local")
@Import(TestChannelBinderConfiguration.class)
class DemoApplicationTests {
@Autowired
StreamBridge streamBridge;
@Autowired
OutputDestination outputDestination;
@Test
void test() throws InterruptedException {
System.out.println("sending");
streamBridge.send("test-out-0", "some message1".getBytes());
streamBridge.send("test-out-0", "some message2".getBytes());
streamBridge.send("test-out-0", "some message3".getBytes());
System.out.println("received: " + new String(outputDestination.receive(1000, "out-topic").getPayload()));
System.out.println("received: " + new String(outputDestination.receive(1000, "out-topic").getPayload()));
System.out.println("received: " + new String(outputDestination.receive(1000, "out-topic").getPayload()));
}
}
I thought consumer would only be called once with list of 3 messages I sent but I actually see this in the logs:
processing: GenericMessage [payload=byte[13], headers={source-type=kafka, id=8d2ff990-91d1-dde3-1874-525036375f80, contentType=application/json, timestamp=1725719498271, target-protocol=kafka}]
processing: GenericMessage [payload=byte[13], headers={source-type=kafka, id=305a409e-5e43-7c58-d1dd-215d3595f56d, contentType=application/json, timestamp=1725719498271, target-protocol=kafka}]
processing: GenericMessage [payload=byte[13], headers={source-type=kafka, id=cc6a41a3-9ffe-19cc-71ef-a211314f2e5f, contentType=application/json, timestamp=1725719498287, target-protocol=kafka}]
So not only the consumer is called 3 times, but it doesn’t even deserialize message to String.
If I change Function<Message<List<String>>, String>
to Function<Message<String>, String>
then consumer is called 3 times but at least I receive Strings.
2