I have a service which acts as a Kafka producer. It receives a complex object via REST endpoint and then it splits and produces it on two Kafka Topics. For second topic, we are extracting messages from a list and sending each msg to the topic. We want to make sure that if any of the msg from the list fails it should not stop the processing of rest of the messages. And error messages are logged and maybe sent to DLT.
Any recommendations here. We are able to send the messages successfully to the two topics at present. Trying to implement error handling.
For Example
RestRecord {
someAttributes,
List<SomeObject>
}
KafkaProducer {
private KafkaTemplate<K, V> kafkaTemplate;
private String topicName;
public void produceMessage(K key, V kafkaRecord) {
log.info("Producing message[{}] on topic {}", key, topicName);
kafkaTemplate.send(topicName,key , kafkaRecord)
.whenComplete((result, throwable) -> handleResult(result, throwable));
}
private void handleResult(SendResult<K, V> sendResult, Throwable throwable) {
Optional.ofNullable(throwable)
.ifPresentOrElse((t) -> handleError(sendResult.getProducerRecord().key(), t),
() -> this.handleSuccess(sendResult));
}
private void handleSuccess(SendResult<K, V> sendResult) {
log.info("Message[{}] produced successfully at offset:{}", sendResult.getProducerRecord().key(), sendResult.getRecordMetadata().offset());
}
private void handleError(K key, Throwable throwable) {
log.error("Error producing Message[{}] {}", key, throwable.getMessage(), throwable);
}
}
ProducerService {
KafkaProducer<String, Topic1Msg> kafkaProducer1;
KafkaProducer<String, Topic2Msg> kafkaProducer2;
public void sendDataToTopic(RestRecord restRecord)
kafkaProducer1.produceMessage(Mapper1.toTopic1Msg(restRecord));
Mapper2.toTopic2MsgList(restRecord)
.forEach(topic2Msg-> kafkaProducer2.produceMessage(topic2Msg);
}
Trying to find out best practices around error handling. Specially when we want to send multiple msgs in loop and continue sending messages in case of errors and handle the errors gracefully.