In our Application we are trying to use Spring-Kafka for REquest Reply Semantics
-
MS 1 Sends Request to a Topic T1- which is listened by MS2
-
MS 1 Waits for Response in Another Topic T2
-
MS 2 Puts Resposne in the Topic T2 with Msg id , so that MS 1 can pick up
-
Things are working fine in normal 10 TPS load, However when load increases, we get
lot of threads hanging in sendAndReceive.getSendFuture().get()
@Component
public class KafkaInvoker {
@Autowired
ReplyingKafkaTemplate<String, String, String> kafkaTemplate;
@Autowired
private KafkaMessageListenerContainer<String, String> replyContainer;
public String sendToQueueKafkaTemplate(String key, String srvMapping, String message, JSONObject headers, String edgereq) {
String response = null;
try {
String topic = aN(srvMapping);
String requestReplyTopic = aN(srvMapping + "_REPLY_TOPIC");
String corrId = headers.getString("messageId");
logger.debug(" CorrilationId " + corrId);
this.kafkaTemplate.setCorrelationIdStrategy(kvProducerRecord -> new CorrelationKey(corrId.getBytes()));
this.kafkaTemplate.setCorrelationIdStrategy(kvProducerRecord -> new CorrelationKey(corrId.getBytes()));
this.kafkaTemplate.setDefaultReplyTimeout(Duration.ofMillis(this.edgetimeoutsec));
ProducerRecord<String, String> record = new ProducerRecord(topic, corrId, message);
record.headers().add((Header)new RecordHeader("kafka_replyTopic", requestReplyTopic.getBytes()));
record.headers().add((Header)new RecordHeader("kafka_replyPartition", partitionId.toString().getBytes()));
if (headers != null) {
Iterator<String> keys = headers.keys();
while (keys.hasNext()) {
String ki = keys.next();
record.headers().add((Header)new RecordHeader(ki, headers.getString(ki).getBytes()));
}
}
Instant st = Instant.now();
RequestReplyFuture<String, String, String> sendAndReceive = this.kafkaTemplate.sendAndReceive(record);
Instant ft = Instant.now();
long timeElapsed = Duration.between(st, ft).toMillis();
SendResult<String, String> sendResult = (SendResult<String, String>)sendAndReceive.getSendFuture().get();
sendResult.getProducerRecord().headers()
.forEach(header -> logger.trace(header.key() + ":" + new String(header.value())));
logger.info("Sent_Msg_To_Topic_ACK::tpc=[" + topic + "],msgId=[" + corrId + "],ttkn=[" + timeElapsed + "ms],record_timestamp=[" + sendResult
.getRecordMetadata().timestamp() + "],partitionID=[" + sendResult
.getRecordMetadata().partition() + "]");
logger.info("waiting for response at topic [" + requestReplyTopic + "] ,partitionID=[" + partitionId + "] with corrId : " + corrId);
ConsumerRecord<String, String> consumerRecord = (ConsumerRecord<String, String>)sendAndReceive.get();
response = (String)consumerRecord.value();
logger.info("Got response for coorId : " + corrId);
}
Is it because, we are setting CorrelationIdStrategy at the Singleton Baean REplyKafkaTemplate?
all threads are hanging in sendAndReceive.get();
How to Set the CorrelationID Strategy for each request.