In my project I updated the kafka-avro-serializer dependency from 7.6.0 to 7.6.1, and my tests began to fail. The bigger issue is, that not all of them, and not always. Approximately 50%-50%, whether the same test passes or not. I isolated 1 test from the rest, but the issue persist.
I have the topic configuration in yml, and I also have an avro converter, but I won’t include them, since they should still be correct, given that the test sometimes passing. A few typo can be present, as I replaced my domain naming with generic ones.
<code>@FieldDefaults(level = AccessLevel.PRIVATE)
@Import(TestChannelBinderConfiguration.class)
@SpringBootTest(classes = TestConfig.class)
public static final String TOPIC= "my-topic";
OutputDestination output;
void shouldReceiveObject_onProducing() throws Exception {
MyObject object = MyObject .builder()
.id(UUID.randomUUID().toString())
underTest.produce(object);
Message<byte[]> message = awaitMessageFromTopic(TOPIC);
byte[] payload = message.getPayload();
MyObject parsedMessage = AvroParser.parsePayload(payload, MyObject.getClassSchema());
assertThat(parsedMessage).isEqualTo(object);
private Message<byte[]> awaitMessageFromTopic(String topic) {
.ignoreException(NullPointerException.class)
.atMost(2, TimeUnit.SECONDS)
.until(() -> output.receive(
<code>@FieldDefaults(level = AccessLevel.PRIVATE)
@Import(TestChannelBinderConfiguration.class)
@SpringBootTest(classes = TestConfig.class)
class LogProducerTest {
public static final String TOPIC= "my-topic";
@Autowired
MyProducer underTest;
@Autowired
OutputDestination output;
@BeforeEach
void clearOutput() {
output.clear(TOPIC);
}
@Test
void shouldReceiveObject_onProducing() throws Exception {
// given
MyObject object = MyObject .builder()
.id(UUID.randomUUID().toString())
.build();
// when
underTest.produce(object);
// then
Message<byte[]> message = awaitMessageFromTopic(TOPIC);
byte[] payload = message.getPayload();
MyObject parsedMessage = AvroParser.parsePayload(payload, MyObject.getClassSchema());
assertThat(parsedMessage).isEqualTo(object);
}
private Message<byte[]> awaitMessageFromTopic(String topic) {
return await()
.ignoreException(NullPointerException.class)
.atMost(2, TimeUnit.SECONDS)
.until(() -> output.receive(
0,
topic
), Objects::nonNull);
}
}
</code>
@FieldDefaults(level = AccessLevel.PRIVATE)
@Import(TestChannelBinderConfiguration.class)
@SpringBootTest(classes = TestConfig.class)
class LogProducerTest {
public static final String TOPIC= "my-topic";
@Autowired
MyProducer underTest;
@Autowired
OutputDestination output;
@BeforeEach
void clearOutput() {
output.clear(TOPIC);
}
@Test
void shouldReceiveObject_onProducing() throws Exception {
// given
MyObject object = MyObject .builder()
.id(UUID.randomUUID().toString())
.build();
// when
underTest.produce(object);
// then
Message<byte[]> message = awaitMessageFromTopic(TOPIC);
byte[] payload = message.getPayload();
MyObject parsedMessage = AvroParser.parsePayload(payload, MyObject.getClassSchema());
assertThat(parsedMessage).isEqualTo(object);
}
private Message<byte[]> awaitMessageFromTopic(String topic) {
return await()
.ignoreException(NullPointerException.class)
.atMost(2, TimeUnit.SECONDS)
.until(() -> output.receive(
0,
topic
), Objects::nonNull);
}
}
The actual reason why failing is because the message is null, so the object is sent to the topic.
My logs are the following:
For both passing and failing:
<code>2024-07-24T13:35:39.360+02:00 DEBUG 31592 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Function 'myTopic' is not found in cache
2024-07-24T13:35:39.365+02:00 DEBUG 31592 --- [ main] o.s.c.f.c.catalog.FunctionTypeUtils : Supplier does not have input type, returning null as input type.
2024-07-24T13:35:39.366+02:00 DEBUG 31592 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Registering function [myTopic]
2024-07-24T13:35:39.370+02:00 DEBUG 31592 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Composed function myTopic<null, reactor.core.publisher.Flux<sample.domain.avro.MyObject>>
2024-07-24T13:37:19.451+02:00 DEBUG 27084 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function myTopic<null, reactor.core.publisher.Flux<sample.domain.avro.MyObject>>
2024-07-24T13:37:20.044+02:00 DEBUG 27084 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Registering function [streamBridge]
2024-07-24T13:37:20.129+02:00 INFO 27084 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'my-topic.destination' has 1 subscriber(s).
2024-07-24T13:37:20.134+02:00 INFO 27084 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : 'myTopic_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2024-07-24T13:38:54.841+02:00 INFO 15968 --- [ main] h.a.t.s.l.producer.LogProducerTest : Started MyTest in 4.311 seconds (process running for 5.563)
<code>2024-07-24T13:35:39.360+02:00 DEBUG 31592 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Function 'myTopic' is not found in cache
2024-07-24T13:35:39.365+02:00 DEBUG 31592 --- [ main] o.s.c.f.c.catalog.FunctionTypeUtils : Supplier does not have input type, returning null as input type.
2024-07-24T13:35:39.366+02:00 DEBUG 31592 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Registering function [myTopic]
2024-07-24T13:35:39.370+02:00 DEBUG 31592 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Composed function myTopic<null, reactor.core.publisher.Flux<sample.domain.avro.MyObject>>
2024-07-24T13:37:19.451+02:00 DEBUG 27084 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function myTopic<null, reactor.core.publisher.Flux<sample.domain.avro.MyObject>>
2024-07-24T13:37:20.044+02:00 DEBUG 27084 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Registering function [streamBridge]
2024-07-24T13:37:20.129+02:00 INFO 27084 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'my-topic.destination' has 1 subscriber(s).
2024-07-24T13:37:20.134+02:00 INFO 27084 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : 'myTopic_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2024-07-24T13:38:54.841+02:00 INFO 15968 --- [ main] h.a.t.s.l.producer.LogProducerTest : Started MyTest in 4.311 seconds (process running for 5.563)
</code>
2024-07-24T13:35:39.360+02:00 DEBUG 31592 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Function 'myTopic' is not found in cache
2024-07-24T13:35:39.365+02:00 DEBUG 31592 --- [ main] o.s.c.f.c.catalog.FunctionTypeUtils : Supplier does not have input type, returning null as input type.
2024-07-24T13:35:39.366+02:00 DEBUG 31592 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Registering function [myTopic]
2024-07-24T13:35:39.370+02:00 DEBUG 31592 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Composed function myTopic<null, reactor.core.publisher.Flux<sample.domain.avro.MyObject>>
2024-07-24T13:37:19.451+02:00 DEBUG 27084 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function myTopic<null, reactor.core.publisher.Flux<sample.domain.avro.MyObject>>
2024-07-24T13:37:20.044+02:00 DEBUG 27084 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Registering function [streamBridge]
2024-07-24T13:37:20.129+02:00 INFO 27084 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'my-topic.destination' has 1 subscriber(s).
2024-07-24T13:37:20.134+02:00 INFO 27084 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : 'myTopic_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2024-07-24T13:38:54.841+02:00 INFO 15968 --- [ main] h.a.t.s.l.producer.LogProducerTest : Started MyTest in 4.311 seconds (process running for 5.563)
My additional logs, when the test passes:
<code>2024-07-24T13:40:54.943+02:00 DEBUG 27756 --- [ main] s.c.s.s.r.a.AvroSchemaServiceManagerImpl : Finding correct DatumWriter for type sample.domain.avro.MyObject
2024-07-24T13:40:54.956+02:00 DEBUG 27756 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : preSend on channel 'bean 'myTopic-out-0'', message: GenericMessage [payload=byte[61], headers={contentType=application/avro-object, id=5ebc9dd2-244a-c761-8fb5-f71b1865b469, timestamp=1721821254951}]
2024-07-24T13:40:54.957+02:00 DEBUG 27756 --- [ main] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@6fcc0a1 received message: GenericMessage [payload=byte[61], headers={contentType=application/avro-object, id=5ebc9dd2-244a-c761-8fb5-f71b1865b469, timestamp=1721821254951}]
2024-07-24T13:40:54.965+02:00 DEBUG 27756 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : postSend (sent=true) on channel 'bean 'myTopic-out-0'', message: GenericMessage [payload=byte[61], headers={contentType=application/avro-object, id=5ebc9dd2-244a-c761-8fb5-f71b1865b469, timestamp=1721821254951}]
<code>2024-07-24T13:40:54.943+02:00 DEBUG 27756 --- [ main] s.c.s.s.r.a.AvroSchemaServiceManagerImpl : Finding correct DatumWriter for type sample.domain.avro.MyObject
2024-07-24T13:40:54.956+02:00 DEBUG 27756 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : preSend on channel 'bean 'myTopic-out-0'', message: GenericMessage [payload=byte[61], headers={contentType=application/avro-object, id=5ebc9dd2-244a-c761-8fb5-f71b1865b469, timestamp=1721821254951}]
2024-07-24T13:40:54.957+02:00 DEBUG 27756 --- [ main] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@6fcc0a1 received message: GenericMessage [payload=byte[61], headers={contentType=application/avro-object, id=5ebc9dd2-244a-c761-8fb5-f71b1865b469, timestamp=1721821254951}]
2024-07-24T13:40:54.965+02:00 DEBUG 27756 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : postSend (sent=true) on channel 'bean 'myTopic-out-0'', message: GenericMessage [payload=byte[61], headers={contentType=application/avro-object, id=5ebc9dd2-244a-c761-8fb5-f71b1865b469, timestamp=1721821254951}]
</code>
2024-07-24T13:40:54.943+02:00 DEBUG 27756 --- [ main] s.c.s.s.r.a.AvroSchemaServiceManagerImpl : Finding correct DatumWriter for type sample.domain.avro.MyObject
2024-07-24T13:40:54.956+02:00 DEBUG 27756 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : preSend on channel 'bean 'myTopic-out-0'', message: GenericMessage [payload=byte[61], headers={contentType=application/avro-object, id=5ebc9dd2-244a-c761-8fb5-f71b1865b469, timestamp=1721821254951}]
2024-07-24T13:40:54.957+02:00 DEBUG 27756 --- [ main] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@6fcc0a1 received message: GenericMessage [payload=byte[61], headers={contentType=application/avro-object, id=5ebc9dd2-244a-c761-8fb5-f71b1865b469, timestamp=1721821254951}]
2024-07-24T13:40:54.965+02:00 DEBUG 27756 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : postSend (sent=true) on channel 'bean 'myTopic-out-0'', message: GenericMessage [payload=byte[61], headers={contentType=application/avro-object, id=5ebc9dd2-244a-c761-8fb5-f71b1865b469, timestamp=1721821254951}]
my pom.xml snippet:
<kafka-avro-serializer.version>7.6.1</kafka-avro-serializer.version>
<spring-cloud-stream-schema-registry-client.version>4.1.3</spring-cloud-stream-schema-registry-client.version>
<spring-cloud.version>2023.0.3</spring-cloud.version>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema-registry-client</artifactId>
<version>${spring-cloud-stream-schema-registry-client.version}</version>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<url>https://packages.confluent.io/maven/</url>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<code><properties>
<kafka-avro-serializer.version>7.6.1</kafka-avro-serializer.version>
<spring-cloud-stream-schema-registry-client.version>4.1.3</spring-cloud-stream-schema-registry-client.version>
<spring-cloud.version>2023.0.3</spring-cloud.version>
</properties>
<dependencies>
<!-- Kafka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<exclusions>
<exclusion>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema-registry-client</artifactId>
<version>${spring-cloud-stream-schema-registry-client.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
</dependency>
<!-- Test-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>io.confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
</plugins>
</build>
</code>
<properties>
<kafka-avro-serializer.version>7.6.1</kafka-avro-serializer.version>
<spring-cloud-stream-schema-registry-client.version>4.1.3</spring-cloud-stream-schema-registry-client.version>
<spring-cloud.version>2023.0.3</spring-cloud.version>
</properties>
<dependencies>
<!-- Kafka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<exclusions>
<exclusion>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema-registry-client</artifactId>
<version>${spring-cloud-stream-schema-registry-client.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
</dependency>
<!-- Test-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>io.confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
</plugins>
</build>
I also inherit some basic dependencies from a parent, but the only notable is, that it uses spring boot 3.3.2, and defines configuration for the plugins.
Does anyone have any idea, what change could have happened between these two versions, resulting this unpredictable behaviour?
I tried different versions for my other dependencies (that resulted in the spring cloud stream update), isolating the test (there were multiple in the class), using DirtiesContext, clearing the OutputDestination, expanding the waiting time, checking in debug mod, and sleeping the thread before execution.