I have a multi-service project that uses spring boot, apache kafka, and jpa. My project requires that no matter what I write to the database I want to send a message to a kafka topic. Therefore I have create 3 classes to increase reusability:
enum WriteEvent{ CREATE, UPDATE, DELETE }
record WriteMessage<P>(WriteEvent event, P payload)
record WriteProducer<P>(String topic, KafkaTemplate<Void, WriteMessage<P>> engine)
Inside WriteProducer
, I have a void produce(WriteMessage<P> message)
method which simply does engine().send(topic(), message)
. Now considering a class Account
as my payload I am enable to consume WriteMessage<Account>
using @KafkaListener
( producing messages works just fine ). My consumer looks like this:
@KafkaListener(id = "session-consumer", topics = "${topics.account:account-events}")
public void consumeAccount(@Payload WriteMessage<Account> message) {
LOGGER.info("consuming account {}", message);
}
even though this method does not throw an exception rather than giving WriteMessage<Account>
its providing me with WriteMessage<LinkedHashMap>
spring configurations:
spring:
kafka:
bootstrap-servers: "${KAFKA_SERVERS:`localhost:9092`}"
consumer:
key-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
spring.json.trusted.packages: "com.example.*"