I am trying to implement a solution based on queues for our Discord-related tasks which uses Discord API. Need some input regarding how this can be implemented.
Here are the expectations from it
- Due to the global rate limit, only one task can be performed at a time.
- Tasks have two priority levels: high and low.
- High-priority tasks are always picked first after completing the current task (if there are any).
Here’s what I’ve tried
I’ve tried using amqplib and RabbitMQ with a direct type exchange, but I am unable to pause one queue based on another.
const amqp = require('amqplib');
async function setup() {
let connection;
let channel;
try {
// Connect to the RabbitMQ server
connection = await amqp.connect('amqp://localhost');
// Create a channel
channel = await connection.createChannel();
// Assert an exchange of type 'direct' that is durable
await channel.assertExchange('priority_exchange', 'direct', {
durable: true,
});
// Assert queues that are durable
await channel.assertQueue('high_priority_queue', {
durable: true,
arguments: {
'x-max-priority': 10, // Ensure this matches the existing queue configuration
},
});
await channel.assertQueue('low_priority_queue', {
durable: true,
});
// Bind the queues to the exchange with specific routing keys
await channel.bindQueue('high_priority_queue', 'priority_exchange', 'high');
await channel.bindQueue('low_priority_queue', 'priority_exchange', 'low');
console.log('Setup complete.');
} catch (error) {
console.error('Setup failed:', error);
} finally {
// Ensure the channel and connection are closed properly
if (channel) {
await channel.close();
}
if (connection) {
await connection.close();
}
}
}
async function consumer() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue('high_priority_queue', {
durable: true,
arguments: {
'x-max-priority': 10,
},
});
await channel.assertQueue('low_priority_queue', {
durable: true,
});
channel.prefetch(1);
channel.consume(
'high_priority_queue',
async(msg) => {
if (msg) {
console.log(
`Received ${msg.content.toString()} from high_priority_queue`
);
await fakePromise();
channel.ack(msg);
}
}, {
noAck: false
}
);
channel.consume(
'low_priority_queue',
async(msg) => {
if (msg) {
console.log(
`Received ${msg.content.toString()} from low_priority_queue`
);
await fakePromise();
channel.ack(msg);
}
}, {
noAck: false
}
);
}
consumer().catch(console.error);
function fakePromise() {
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 5000);
});
}
1