I would like to write a function (i.e., the function write
shown below) that handles backpressure by queueing each new chunk of data while the stream is draining. Once the stream has drained, then I would like for it to continue writing the data to the stream in the order it was queued. I would like to do this using a async/await pattern as opposed to using a callback.
My implementation is below. I’m wondering if I am doing this right or if I have neglected to handle an important part of the procedure.
import * as stream from 'node:stream';
const transform = new stream.Transform({
async transform(chunk: Buffer, encoding: BufferEncoding, callback: stream.TransformCallback) {
await new Promise((r) => setTimeout(r, 100));
callback(null, chunk.toString('utf-8'));
}
});
transform.pipe(process.stdout);
const queue: Array<Buffer> = [];
async function write(data: Buffer, encoding?: BufferEncoding): Promise<void> {
if (!transform.closed && transform instanceof stream.Writable) {
if (!transform.writableNeedDrain) {
if (queue.length === 0) {
if (!transform.write(data)) {
await new Promise((r)=>transform.once('drain', r));
}
}
else {
queue.push(data);
}
while (queue.length) {
const data = queue.shift();
if (!transform.write(data)) {
await new Promise((r)=>transform.once('drain', r));
}
}
}
else {
queue.push(data);
}
}
}
for (let i = 0; i < 1e1; i++) {
write(Buffer.from('Something'.repeat(1e1), 'utf-8'));
}