I am trying to create a Reactive API with simple CRUD operations that also sends messages out to Kafka. But for some reason, when using Mono and R2DBC save operations, it seems to trigger the save operation 2 additional times. Yet this does not affect the saving into DB, nor does it affect the sending out of the message to Kafka.
Here is my send method:
public Mono<Trade> createNewTrade(Trade trade) {
Mono<Trade> newTrade = tradeRepository.save(trade);
newTrade.subscribe(i -> reactiveKafkaProducer.sendMessage(i));
return newTrade;
}
The TradeRepository interface:
public interface TradeRepository extends R2dbcRepository<Trade, Integer> {
}
The method in the Reactive Kafka Producer that’s sending out the message:
public void sendMessage(Trade trade) {
try {
ObjectMapper mapper = new ObjectMapper();
String tradeString = mapper.writeValueAsString(trade);
log.info(tradeString);
sender.<Integer>send(Mono.just(1)
.map(i -> SenderRecord.create(new ProducerRecord<>(TOPIC, i, tradeString), i))
)
.doOnError(e -> log.error("Send failed", e))
.subscribe();
} catch (Exception e) {
log.info(e.getMessage());
}
}
I’m guessing that the re-triggering of the save operation has something to do with the subscribe, but if I were to disable the subscribe in the Kafka producer, the message doesn’t get sent out to Kafka. Help would be greatly appreciated to understand this, thank you in advance!
EDIT: To be more specific, this is the error that I get.
org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException: Unique index or primary key violation: "PRIMARY KEY ON PUBLIC.TRADE(TRADE_ID) ( /* key:3 */ 3, 'Superman', 1129, 'BUY', 'In Progress')"; SQL statement:
INSERT INTO TRADE (TRADE_ID, NAME, TRADE_VALUE, TYPE, STATUS) VALUES ($1, $2, $3, $4, $5) [23505-224]
Josiah Foo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.