This topic has multiple produce, some messages cannot be processed, Most of the messages are being consumed normally.
pom info
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-serializer</artifactId>
<version>4.1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
the exception information is as follows
2024-05-09 10:03:31.702 ERROR [main] - [SchemaConsumerTest.java:100] - Error deserializing key/value for partition xxxx.topic-3 at offset 480960. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition xxxx.topic-3 at offset 480960. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:331)
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:283)
at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168)
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134)
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:693)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:585)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:827)
at com.citi.gsp.kafka.consumer.SchemaConsumerTest.receiveMsg(SchemaConsumerTest.java:92)
at com.citi.gsp.kafka.consumer.SchemaConsumerTest.testConsumer(SchemaConsumerTest.java:83)
at com.citi.gsp.kafka.consumer.SchemaConsumerTest.main(SchemaConsumerTest.java:55)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 13212
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:156)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:62)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:321)
... 11 common frames omitted
Caused by: java.io.EOFException: null
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:514)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:155)
at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:465)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:125)
... 16 common frames omitted
I checked the schema id 13212 and it looks fine, I did get the schema when debugging locally.
I use StringDeserializer to get this message to confirm that it is not an empty message
New contributor
amulet is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.