spring:
cloud:
stream:
kafka:
logging:
level:
org.springframework.cloud.stream: DEBUG
org.apache.kafka: DEBUG
binder:
producerproperties:
retries: 5
autoCreateTopics: false
brokers: ${KAFKA_BOOTSTRAP_SERVERS}
defaultBrokerPort: 9092
minPartitionCount: 1
autoAddPartitions: false
configuration:
security.protocol: PLAINTEXT
sasl.mechanism:
sasl.jaas.config:
bindings:
output:
destination: test-topic
contentType: application/x-protobuf
producer:
useNativeEncoding: true
partitionCount: 1
requiredGroups: test-group
configuration:
key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
@ActiveProfiles(profiles = {"test","h2","kafka"})
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {Application.class})
@ExtendWith(SpringExtension.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class KafkaPublisherTest {
@Autowired
StreamBridge streamBridge;
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static {
kafka.start();
}
@DynamicPropertySource
public static void initKafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
registry.add("spring.cloud.stream.kafka.binder.configuration.security.protocol", () -> "PLAINTEXT");
registry.add("spring.cloud.stream.kafka.binder.brokers", kafka::getBootstrapServers);
}
@BeforeEach
void setUpConsumer() {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", kafka.getBootstrapServers());
consumerProps.put("group.id", "test-group");
consumerProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
consumerProps.put("auto.offset.reset", "earliest");
consumer = new KafkaConsumer<>(consumerProps);
}
@Test
public void testSendAndReceive() throws InterruptedException, ExecutionException {
byte[] messagePayload = "Hello Kafka!".getBytes();
Message<byte[]> message = MessageBuilder.withPayload(messagePayload).build();
System.setProperty("KAFKA_BOOTSTRAP_SERVERS", kafka.getBootstrapServers());
AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()));
NewTopic topic = new NewTopic("test-topic", 1, (short) 1);
adminClient.createTopics(Collections.singletonList(topic)).all().get();
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test-topic");
ConfigEntry retentionEntry = new ConfigEntry("retention.ms", String.valueOf(604800000L));
AlterConfigOp alterConfigOp = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
adminClient.incrementalAlterConfigs(Collections.singletonMap(topicResource, Collections.singletonList(alterConfigOp))).all().get();
try {
boolean result = streamBridge.send("output", message);
if (!result) {
System.err.println("Failed to send message");
}
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(5000);
consumer.subscribe(Collections.singletonList("test-topic"));
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(5));
System.out.println(records.count());
}
I’m using Spring Cloud Stream to produce Kafka messages in my application, but the messages are not being delivered to the intended topic. The StreamBridge.send method returns true, indicating successful execution, yet the messages don’t appear on the expected topic.
Interestingly, when I produce a message to the topic manually using the Kafka CLI, the consumer set up in the test class is able to consume the message without issues. This suggests the topic and consumer configuration are correct. Any insights into why messages from StreamBridge are not reaching the topic would be appreciated.”