I’m working on a project that interacts with the blockchain using Node.js and RabbitMQ. I need to implement a system where tasks are batched into an array before being sent to the blockchain. The requirements are:
- Each worker should collect up to 5 tasks and combine them into an array and send the array to the blockchain.
- If a worker has fewer than 5 tasks but the first task has been waiting for at least 5 minutes, the worker should send the available tasks to the blockchain anyway (as an array of course).
I know I can write and handle the timeout or the tasks manually, but I was just wondering if there is a way to handle it purely using RabbitMQ workers.
I asked ChatGPT and here’s the answer, and the code handles everything itself, since I’m new to RabbitMQ, I wanted to know if there’s a better way to handle it.
<code>const amqp = require('amqplib/callback_api');
const { sendToBlockchain } = require('./blockchain'); // Function to send tasks to the blockchain
const QUEUE = 'task_queue';
const MAX_TASKS = 5;
const MAX_WAIT_TIME = 5 * 60 * 1000; // 5 minutes in milliseconds
let taskBuffer = [];
let timer = null;
let firstTaskTime = null;
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
channel.assertQueue(QUEUE, {
durable: true
});
console.log("Waiting for messages in %s. To exit press CTRL+C", QUEUE);
channel.consume(QUEUE, function(msg) {
const task = msg.content.toString();
console.log("Received %s", task);
if (taskBuffer.length === 0) {
firstTaskTime = Date.now();
startTimer();
}
taskBuffer.push(task);
if (taskBuffer.length >= MAX_TASKS) {
processTasks();
}
}, {
noAck: true
});
});
});
function startTimer() {
timer = setTimeout(() => {
if (taskBuffer.length > 0) {
processTasks();
}
}, MAX_WAIT_TIME);
}
function processTasks() {
if (timer) {
clearTimeout(timer);
timer = null;
}
if (taskBuffer.length > 0) {
sendToBlockchain(taskBuffer)
.then(() => {
console.log('Tasks sent to blockchain:', taskBuffer);
taskBuffer = [];
firstTaskTime = null;
})
.catch(error => {
console.error('Error sending tasks to blockchain:', error);
});
}
}
</code>
<code>const amqp = require('amqplib/callback_api');
const { sendToBlockchain } = require('./blockchain'); // Function to send tasks to the blockchain
const QUEUE = 'task_queue';
const MAX_TASKS = 5;
const MAX_WAIT_TIME = 5 * 60 * 1000; // 5 minutes in milliseconds
let taskBuffer = [];
let timer = null;
let firstTaskTime = null;
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
channel.assertQueue(QUEUE, {
durable: true
});
console.log("Waiting for messages in %s. To exit press CTRL+C", QUEUE);
channel.consume(QUEUE, function(msg) {
const task = msg.content.toString();
console.log("Received %s", task);
if (taskBuffer.length === 0) {
firstTaskTime = Date.now();
startTimer();
}
taskBuffer.push(task);
if (taskBuffer.length >= MAX_TASKS) {
processTasks();
}
}, {
noAck: true
});
});
});
function startTimer() {
timer = setTimeout(() => {
if (taskBuffer.length > 0) {
processTasks();
}
}, MAX_WAIT_TIME);
}
function processTasks() {
if (timer) {
clearTimeout(timer);
timer = null;
}
if (taskBuffer.length > 0) {
sendToBlockchain(taskBuffer)
.then(() => {
console.log('Tasks sent to blockchain:', taskBuffer);
taskBuffer = [];
firstTaskTime = null;
})
.catch(error => {
console.error('Error sending tasks to blockchain:', error);
});
}
}
</code>
const amqp = require('amqplib/callback_api');
const { sendToBlockchain } = require('./blockchain'); // Function to send tasks to the blockchain
const QUEUE = 'task_queue';
const MAX_TASKS = 5;
const MAX_WAIT_TIME = 5 * 60 * 1000; // 5 minutes in milliseconds
let taskBuffer = [];
let timer = null;
let firstTaskTime = null;
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
channel.assertQueue(QUEUE, {
durable: true
});
console.log("Waiting for messages in %s. To exit press CTRL+C", QUEUE);
channel.consume(QUEUE, function(msg) {
const task = msg.content.toString();
console.log("Received %s", task);
if (taskBuffer.length === 0) {
firstTaskTime = Date.now();
startTimer();
}
taskBuffer.push(task);
if (taskBuffer.length >= MAX_TASKS) {
processTasks();
}
}, {
noAck: true
});
});
});
function startTimer() {
timer = setTimeout(() => {
if (taskBuffer.length > 0) {
processTasks();
}
}, MAX_WAIT_TIME);
}
function processTasks() {
if (timer) {
clearTimeout(timer);
timer = null;
}
if (taskBuffer.length > 0) {
sendToBlockchain(taskBuffer)
.then(() => {
console.log('Tasks sent to blockchain:', taskBuffer);
taskBuffer = [];
firstTaskTime = null;
})
.catch(error => {
console.error('Error sending tasks to blockchain:', error);
});
}
}