Good Day.
I am trying to implement sync request reply message flow using ReplyingKafkaTemplate. It worked fine as a individual POC. But my application has already kafka implementation to consume the messages manually. I am using ConcurrentMessageListenerContainer as shown below.
@Bean
public ConcurrentMessageListenerContainer<String, String> container(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
Producer using KafkaTemplate
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
App has custom MessageListener to consume the messages which implements AcknowledgingMessageListener.
public class KafkaListener implements
public class KafkaListener implements AcknowledgingMessageListener {
...
@Override
public void onMessage(ConsumerRecord consumeRecord, Acknowledgment acknowledgment) {
}
}
Above implementation works fine for publishing and consuming messages. But now i am trying to extend this to support Sync Kafka message flow using ReplyingKafkaTemplate. I added ReplyingKafkaTemplate to this framework. It published message to request topic and listener consumed message.
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory()
throws IOException {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
factory.setStatefulRetry(true);
**factory.setReplyTemplate(kafkaTemplate);** // to support REPLY_TO topic for req/reply
pattern
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
But the response is not published to reply topic. I followed Spring Kafka documentation. I am trying to implement with out @KafkaListener. I want to publish the reply message to reply topic with out using @KafkaListener. Could you please help me on this?
I am looking for a solution to send a response to reply topic with out using @KafkaListener. I tried different solutions avalable over web but somehow, the response message is not publishing to reply topic which resulted in ReplyTime out every time.