I am very new to rabbitmq and I need to build some micro services.
So, I have two queues. The first application pushes a message to the queue-one which is an API and this message is consumed in the second application and based on this message, some data is retrieved from the database and again the retrieved data is queued to the queue-two and this data is again consumed by the same API in the first application.
There is also another sql query called in the API. But the problem is that, the response is returned before the data from queue-two is consumed in the API. Main idea is to have all the data ready before returning the response.
Can there by anyway to wait and finish for all the messages are consumed (let’s say multiple messages) and also the data from second sql query is also ready and then send the response back to user ??? below is the code for both the application:
firstApplication.js
var channel, connection;
async function connect() {
const amqpServer = "amqp://localhost:5672";
connection = await amqp.connect(amqpServer);
channel = await connection.createChannel();
await channel.assertQueue("queue-two");
}
connect();
async function getDataOne(req, res) {
try {
connection = await oracledb.getConnection(dbConfig.config.db)
var package = {
dataOne: [],
dataTwo: [],
}
let match = true
if (match) {
channel.sendToQueue("queue-one", Buffer.from(JSON.stringify('someMessage')))
channel.consume('queue-two', data => {
package["dataTwo"] = JSON.parse(data.content)
channel.ack(data)
});
package["dataOne"] = await connection.execute(`SELECT * FROM TABLE_A FETCH FIRST 15 ROWS ONLY`)
}
}
catch (error) {
console.log(error)
} finally {
try {
await connection.close();
} catch (error) {
}
}
if (package["dataOne"].length == 0) {
return res.send('No data found');
} else {
return res.status(200).json({
package
})
}
}
app.get('/', function(req, res) {
getDataOne(req, res);
})
app.listen(config.PORT, config.HOST, function () {
console.log(`App listening on http://${config.HOST}:${config.PORT}`);
});
secondApplication.js
async function getDataTwo(msg) {
try {
connection = await oracledb.getConnection(dbConfig.config.db)
let match = true
if (match) {
data = await connection.execute(`SELECT * FROM TABLE_B WHERE MESSAGE = :MSG`, {MSG: msg})
}
}
catch (error) {
console.log(error)
} finally {
try {
await connection.close();
} catch (error) {
console.log(error)
}
}
if (data.rows == 0) {
console.log("No Data found")
} else {
return data.rows
}
}
async function connectQueue() {
const amqpServer = "amqp://localhost:5672";
connection = await amqp.connect(amqpServer);
channel = await connection.createChannel();
await channel.assertQueue("queue-one");
}
connectQueue().then(() => {
channel.consume("queue-one", (data) => {
const msg = JSON.parse(data.content)
getDataTwo(msg).then((rows) => {
channel.sendToQueue("queue-two", Buffer.from(JSON.stringify(rows)))
})
channel.ack(data)
})
})
app.listen(PORT, () => console.log("Server running at port " + PORT));