I want to create a microservice like architechture in expressjs
and for communication between services,I’m using kafkajs
.
Now From producer service,Inside the api /name
I get data from front end and after that I send the message to the consumer service.Now inside the consumer service ,I want to send back the data to the producer and from the producer /name
I want to sent back the response,coming from consumer to the front end.
But when I hit the api ,the data is coming to consumer service via producer service but at the time of send the message to the producer service from consumer service,I’m getting error:
Error sending response to producer: KafkaJSError: The producer is disconnected
at validateConnectionStatus (/home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/node_modules/kafkajs/src/producer/messageProducer.js:31:15)
at sendBatch (/home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/node_modules/kafkajs/src/producer/messageProducer.js:82:5)
at Object.send (/home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/node_modules/kafkajs/src/producer/messageProducer.js:120:12)
at processData (/home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/app.js:20:20)
at Runner.eachMessage (/home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/app.js:35:13)
at Runner.processEachMessage (/home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/node_modules/kafkajs/src/consumer/runner.js:231:20)
at onBatch (/home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/node_modules/kafkajs/src/consumer/runner.js:447:20)
at Runner.handleBatch (/home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/node_modules/kafkajs/src/consumer/runner.js:461:11)
at handler (/home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/node_modules/kafkajs/src/consumer/runner.js:58:30)
at /home/subham/Documents/Projects/NodeJs/KafkaJs/Server2/node_modules/kafkajs/src/consumer/worker.js:29:15 {
retriable: true,
helpUrl: undefined,
[cause]: undefined
}
Now I’m attaching the codebase.
producer_service.js
:
const express = require('express');
const { Kafka } = require('kafkajs');
const app = express();
const kafka = new Kafka({
clientId: 'producer',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
app.use(express.json());
app.get('/name', async (req, res) => {
const message = req.query;
console.log(message)
try {
// Send message to Kafka
await producer.send({
topic: 'topic-name',
messages: [{ value: JSON.stringify(message) }]
});
// Listen for response from the consumer
const response = await consumerResponse();
// Send the response back to the frontend
res.status(200).json({ message, response });
} catch (error) {
console.error(error);
res.status(500).send('Error sending message to Kafka');
}
});
const consumerResponse = async (message) => {
const consumer = kafka.consumer({ groupId: 'response-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'response-topic', fromBeginning: true });
const response = new Promise(async (resolve, reject) => {
await consumer.run({
eachMessage: async ({ topic, partition, message: responseMessage }) => {
const response = responseMessage.value.toString();
console.log(`Received response from consumer: ${response}`);
resolve(response);
await consumer.disconnect();
}
});
});
return response;
};
(async () => {
await producer.connect();
app.listen(3000, () => {
console.log('Producer microservice running on port 3000');
});
})();
consumer_service.js
:
const express = require('express');
const { Kafka } = require('kafkajs');
const app = express();
const kafka = new Kafka({
clientId: 'consumer',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({ groupId: 'group-id' });
const producer = kafka.producer();
app.use(express.json());
const processData = async (message) => {
console.log({
value: message.value.toString()
});
// Process the message
try {
await producer.send({
topic: 'response-topic',
messages: [{ value: `Response to message: ${message.value.toString()}` }]
});
} catch (error) {
console.error('Error sending response to producer:', error);
}
};
const startConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'topic-name', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await processData(message);
},
});
};
startConsumer().catch((error) => {
console.error('Error starting consumer:', error);
process.exit(1);
});
app.listen(4000, () => {
console.log('Consumer microservice running on port 4000');
});
How to achieve the architecture where we can communicate with both way?