I have a topic consummed of log debezium, and I want aggregate and produce another topic and to do that i will use the quarkus streams.
1 – I wrote my pojo
@RegisterForReflection
public class DoClass {
public After after;
public Before before;
public Source source;
public String op;
public DoClass() {
}
public DoClass(After after, Before before, Source source, String op) {
this.after = after;
this.before = before;
this.source = source;
this.op = op;
}
}
The after, source and before class is common pojo.
The application properties
quarkus.kafka-streams.bootstrap-servers=kafkabrokershere:9092
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG
quarkus.kafka-streams.topics=collector_topic
and finaly my function to return KStream,
@Produces
public Topology buildTopology() {} // I removed that because here does not have problemn
private KStream<String, Payload> buildStream(StreamsBuilder streamsBuilder) {
Serde<Payload> serde = Serdes.serdeFrom(new MySerializer(), new MyDeserializer());
return streamsBuilder
.stream(COLLECTOR, Consumed.with(Serdes.String(), serde))
.filter((k, v) -> {
System.out.println("null here"+v);
boolean b = v != null;
return b;
});
}
serializer and deserializer, I think here is the problemn
public class MyDeserializer extends ObjectMapperDeserializer<DoClass> {
public FeriadoDeserializer() {
super(DoClass.class);
}
}
public class MySerializer extends ObjectMapperSerializer<DoClass> {
}