I am producing a message to a topic “ITEM_PRICES” using dontnet(Confluent.kafka) library). This Message is meant to be a Key,Value pair with the Key as a string and value as float. I setup the producer serializers as follows:
_producer = new ProducerBuilder<string, float>(producerConfig)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.Single)
.Build();
And then produce to the topic like this :
_producer.Produce(ITEM_PRICES, new Confluent.Kafka.Message<string, float> { Key= "ITEMNAME", Value = Convert.ToSingle(itemPrice)});
I need to consume the these messages in my kafka streams application(java) and attempt to do so like this:
KStream<String, Float> itemPriceStream = streamsBuilder
.stream("ITEM_PRICES", Consumed.with(Serdes.String(), Serdes.Float()));
itemPriceStream .peek((key, value) -> System.out.println("Key: " + key + ", Value: " + value));
But I get this error : Size of data received by Deserializer is not 4.
My question is do I need to create a custom deserializer in order to read these messages into my stream? How would I do that?
I tried deserializing with built in String and Float deserializers and it did not work.
Katleho mokoena is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.