when executing this integration text in spring boot (version : 3.1.3) and kafka project I receive this error message: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166); Caused by: java.net.MalformedURLException: no protocol: not-used/subjects/test_compacted_user-key/versions?normalize=false .
Integration test class:
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Autowired
private AmazonS3 panamaS3Client;
private Consumer<Void, XFile> consumer;
@BeforeEach
void setup() throws InterruptedException {
// Create consumer
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup1", "true", this.embeddedKafkaBroker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, MockKafkaAvroDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MockKafkaAvroDeserializer.class);
consumerProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "not-used");
consumerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
ConsumerFactory<Void, XFile> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
this.consumer = cf.createConsumer();
this.embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "test_xfiles");
// Init compacted topics
CompactedModelKey modelKey = CompactedModelKey.newBuilder()
.setCode("MODEL")
.setVersion(1)
.build();
CompactedModel modelValue = CompactedModel.newBuilder()
.setModelBuilder(Model.newBuilder()
.setCode("MODEL")
.setName("NAME")
.setPartner("PARTNER")
.setAuthorizeDataByFile(true)
.setVersionMajor(1)
.setVersionMinor(21)
.setTemplate(ByteBuffer.wrap("TPL : ${key}".getBytes()))
.setContentType(MediaType.TEXT_HTML_VALUE)
.setFilename("filename.test")
.setStorageType(StorageType.PANAMA)
.setPanamaConfigBuilder(PanamaConfig.newBuilder()
.setBucket("BUCKET")
.setGenerateUniquePath(true)
.setFilename("panama_config_filename.test")
.setPath("PATH/TO/FILE")
.setMetadataTemplate(ByteBuffer.wrap("{"key" : "M:${key}"}".getBytes()))
)
)
.build();
CompactedUserKey userKey = CompactedUserKey.newBuilder()
.setUsername("USER")
.build();
CompactedUser userValue = CompactedUser.newBuilder()
.setUserBuilder(fr.laposte.intra.courrier.xfiles.model.avro.v1.priv.domain.User.newBuilder()
.setUsername("USER")
.setUserType(UserType.KAFKA_APP.toString())
.setPartners(List.of("PARTNER"))
)
.build();
this.kafkaTemplate.executeInTransaction((arg) -> this.kafkaTemplate.send("test_compacted_user", userKey, userValue));
this.kafkaTemplate.executeInTransaction((arg) -> this.kafkaTemplate.send("test_compacted_model", modelKey, modelValue));
// wait until the partitions are assigned
for (MessageListenerContainer messageListenerContainer : this.kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
this.embeddedKafkaBroker.getPartitionsPerTopic());
}
// Wait for compacted topic init
Thread.sleep(500);
}
@Test
void testNewMessage() throws IOException {
// Data
UUID xFileUUID = UUID.randomUUID();
String partner = "PARTNER";
// Prepare
PrivateXFileKey key = PrivateXFileKey.newBuilder()
.setUuid(xFileUUID.toString())
.build();
PrivateXFile value = PrivateXFile.newBuilder()
.setUuid(xFileUUID.toString())
.setPartner(partner)
.setModelBuilder(ModelDescriptor.newBuilder()
.setReferenceBuilder(Reference.newBuilder()
.setCode("MODEL")
.setVersion(1)
)
)
.setCreatedDate(Instant.ofEpochMilli(1))
.setCreatedBy("USER")
.setData("{"key":"value"}")
.setOptionsBuilder(fr.laposte.intra.courrier.xfiles.model.avro.v1.priv.domain.Options.newBuilder().setExpirationDate(Instant.now().toEpochMilli()+(30*8460000)).setExpirationDelay(30L))
.build();
final PutObjectResult putObjectResult = mock(PutObjectResult.class);
when(panamaS3Client.putObject(any())).thenReturn(putObjectResult);
when(putObjectResult.getETag()).thenReturn("");
// Call
ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("test_xfilesPriv", null, null, null, value);
this.kafkaTemplate.executeInTransaction((arg) -> this.kafkaTemplate.send(producerRecord));
// Verify
ConsumerRecord<Void, XFile> record = KafkaTestUtils.getSingleRecord(consumer, "test_xfiles", Duration.ofMillis(5000));
assertThat(record.value().getUuid()).isEqualTo(xFileUUID.toString());
assertThat(record.value().getPartner()).isEqualTo(partner);
assertThat(record.value().getStorage().getPanama()).isEqualTo("urn:panama:BUCKET/PATH/TO/FILE/" + xFileUUID + "panama_config_filename.test");
ArgumentCaptor<PutObjectRequest> putObjectRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
verify(this.panamaS3Client).putObject(putObjectRequest.capture());
assertThat(putObjectRequest.getValue().getMetadata().getUserMetadata().get("key")).isEqualTo("M:value");
assertThat(IOUtils.readLines(putObjectRequest.getValue().getInputStream(), StandardCharsets.UTF_8).get(0)).isEqualTo("TPL : value");
}
@TestConfiguration
public static class Config {
@Bean
public AmazonS3 amazonS3Client() {
return mock(AmazonS3.class);
}
@Bean
public CachetClient cachetClient() {
return mock(CachetClient.class);
}
}}
Belkacem Bouchama is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.