I’ve got the following duplex stream. Input is a CSV string. When the limit of CSV lines is reached, a new CSV is created and added to a ZIP archive. So the pipeline flow is this:
<code>CSV STRING ---> MY DUPLEX ---> ZIP ARCHIVE STREAM CONTAINING N CSV FFILES ---> DO SOMETHING (e. g. upload to S3)
<code>CSV STRING ---> MY DUPLEX ---> ZIP ARCHIVE STREAM CONTAINING N CSV FFILES ---> DO SOMETHING (e. g. upload to S3)
</code>
CSV STRING ---> MY DUPLEX ---> ZIP ARCHIVE STREAM CONTAINING N CSV FFILES ---> DO SOMETHING (e. g. upload to S3)
When running this with large files where I pipe the result (the zip archive stream) into an S3 upload, I get the following warning MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 drain listeners added to [PassThrough].
Here is my Duplex stream. I’m pretty sure I got something confused about the backpressure handling. So my question is: How can I solve backpressure issue while getting rid of the warning / potential memory leak? And do I actually need to handle backpressure myself or not?
<code>import { Duplex, PassThrough } from "stream";
import archiver from "archiver";
export class ArchiveSplitter extends Duplex {
private _currentFile: PassThrough | undefined;
private _readable: archiver.Archiver;
private _fileCounter: number = 1;
private _lineCounter: number = 1;
private readonly basename: string,
private readonly limit = 90000,
const _readable = archiver("zip");
this._readable = _readable;
this.once("finish", async () => {
this._currentFile && this._currentFile.end();
await this._readable.finalize();
_readable.on("end", async () => {
_readable.on("error", (err) => void this.emit("error", err));
_construct(callback: (error?: Error | null) => void): void {
this._readable.destroy(err);
_write(chunk: any, _: BufferEncoding, callback: (error?: Error | null) => void) {
if (!this._currentFile) {
const ok = this._currentFile!.write(chunk);
this._currentFile?.once("drain", callback);
if (this._lineCounter++ === this.limit && !!this._currentFile) {
this._currentFile = undefined;
const chunk = this._readable.read(size);
if (chunk !== null) this.push(chunk);
else this._readable.once("readable", () => void this._read());
this._currentFile = new PassThrough();
const name = `${this.basename}-${this._fileCounter++}.csv`;
this._readable = this._readable.append(this._currentFile, { name });
<code>import { Duplex, PassThrough } from "stream";
import archiver from "archiver";
export class ArchiveSplitter extends Duplex {
private _currentFile: PassThrough | undefined;
private _readable: archiver.Archiver;
private _fileCounter: number = 1;
private _lineCounter: number = 1;
constructor(
private readonly basename: string,
private readonly limit = 90000,
) {
super();
const _readable = archiver("zip");
this._readable = _readable;
_readable.pause();
this.once("finish", async () => {
this._currentFile && this._currentFile.end();
await this._readable.finalize();
this._readable.end();
});
_readable.on("end", async () => {
void this.push(null);
});
// forward errors
_readable.on("error", (err) => void this.emit("error", err));
}
_construct(callback: (error?: Error | null) => void): void {
callback(null);
}
_destroy(err: Error) {
this._readable.destroy(err);
}
_write(chunk: any, _: BufferEncoding, callback: (error?: Error | null) => void) {
if (!this._currentFile) {
this.startFile();
}
const ok = this._currentFile!.write(chunk);
if (!ok) {
this._currentFile?.once("drain", callback);
}
if (this._lineCounter++ === this.limit && !!this._currentFile) {
this._currentFile.end();
this._currentFile = undefined;
}
callback(null);
}
_read(size?: number) {
const chunk = this._readable.read(size);
if (chunk !== null) this.push(chunk);
else this._readable.once("readable", () => void this._read());
}
private startFile() {
this._currentFile = new PassThrough();
const name = `${this.basename}-${this._fileCounter++}.csv`;
this._readable = this._readable.append(this._currentFile, { name });
this._lineCounter = 1;
}
}
</code>
import { Duplex, PassThrough } from "stream";
import archiver from "archiver";
export class ArchiveSplitter extends Duplex {
private _currentFile: PassThrough | undefined;
private _readable: archiver.Archiver;
private _fileCounter: number = 1;
private _lineCounter: number = 1;
constructor(
private readonly basename: string,
private readonly limit = 90000,
) {
super();
const _readable = archiver("zip");
this._readable = _readable;
_readable.pause();
this.once("finish", async () => {
this._currentFile && this._currentFile.end();
await this._readable.finalize();
this._readable.end();
});
_readable.on("end", async () => {
void this.push(null);
});
// forward errors
_readable.on("error", (err) => void this.emit("error", err));
}
_construct(callback: (error?: Error | null) => void): void {
callback(null);
}
_destroy(err: Error) {
this._readable.destroy(err);
}
_write(chunk: any, _: BufferEncoding, callback: (error?: Error | null) => void) {
if (!this._currentFile) {
this.startFile();
}
const ok = this._currentFile!.write(chunk);
if (!ok) {
this._currentFile?.once("drain", callback);
}
if (this._lineCounter++ === this.limit && !!this._currentFile) {
this._currentFile.end();
this._currentFile = undefined;
}
callback(null);
}
_read(size?: number) {
const chunk = this._readable.read(size);
if (chunk !== null) this.push(chunk);
else this._readable.once("readable", () => void this._read());
}
private startFile() {
this._currentFile = new PassThrough();
const name = `${this.basename}-${this._fileCounter++}.csv`;
this._readable = this._readable.append(this._currentFile, { name });
this._lineCounter = 1;
}
}