I have a bull queue implemented in Redis, which accepts data in this format
{
sessionID : '43i43ko4'
events : [{...}]
}
This data are being sent as queue and then I process them in consumer.js. The processing part is simple
- Get sessionID from payload check if it exists as file if not create
it {sessionID}.csv - Get the events from that sessionID job and append to the file
The consumer.js is running on single pm2 replica
Which is perfectly fine and its pushing the events of that sessionID in order.
But now these queues are increasing in number which i am having a hard time processing them all and memory is increasing gradually a lot, because events is sometimes very big.
So I increased the replicas of pm2 now I am scaling the consumer.js to 4 replicas.
Now i am concerned about corrupting the files because 4 jobs(Queues) with same sessionID might be excuted in the same time meaning read/write same time. Which is causing corrupting of the files
My code is
async build(job) {
let { events, sessionId,... } = job.data;
let filename = constructSessionReplayFilePath(this.SESSION_REPLAYS_FOLDER_PATH, target, videoID);
for(event of events){
await compressDecompress(filename, event);
}
compressDecompress and constructSessionReplay method
export function constructSessionReplayFilePath(path, target, videoID) {
return path + target + '/' + videoID + '.csv';
}
export async function compressDecompress(filename, event) {
const serializedObject = JSON.stringify(event);
const compressedData = zlib.deflateSync(serializedObject);
const dataLength = Buffer.alloc(4);
dataLength.writeInt32BE(compressedData.length, 0);
await fs.promises.appendFile(filename, dataLength);
await fs.promises.appendFile(filename, compressedData);
}
What can i do to prevent this?
One way was to scale the application in 4 queue and one replica per queue.
8