const command = new GetObjectCommand(getObjectParams);
const data = await s3Client.send(command);
const stream = data.Body;
const [streamForRecordCount, streamForEntireRecords] = duplicateStream(stream);
const recordedCount = await processRecordCount(streamForRecordCount);
const records = await processEntireRecords(streamForEntireRecords);
console.log("records", records);
Above is code in Node.js where a stream of a CSV file is fetched from AWS S3. A stream is split into two since streams are consumable and cannot be used by both. This code works perfectly fine and returns recordedCount
and records
as expected. The details of the methods are as follows:
async function processRecordCount(stream) {
return new Promise((resolve, reject) => {
let firstLineProcessed = false;
const results = [];
const parser = csvParser();
stream.on('error', (err) => {
console.error('Error with the input stream:', err);
reject(err);
});
parser.on('error', (err) => {
console.error('Error parsing CSV:', err);
reject(err);
});
parser.on('end', () => {
if (results.length === 0) {
reject(new Error('CSV is empty'));
} else {
resolve(results[0]);
}
});
stream.pipe(parser);
parser.on('data', (row) => {
if (!firstLineProcessed) {
results.push(row);
firstLineProcessed = true;
stream.unpipe(parser);
parser.end();
}
});
});
}
async function processEntireRecords(stream) {
return new Promise((resolve, reject) => {
const records = [];
const parser = csvParser({
trim: true,
skip_empty_lines: true,
columns: true,
skipLines: 1 // first line is not a header
});
stream.on('error', (err) => {
console.error('Error with the input stream:', err);
reject(err);
});
parser.on('error', (err) => {
console.error('Error parsing CSV:', err);
reject(err);
});
parser.on('data', (record) => {
const filteredRecord = Object.fromEntries(
Object.entries(record)
.filter(([, value]) => value !== '')
.map(([key, value]) => [key, { S: value }])
);
records.push(filteredRecord);
});
parser.on('end', () => {
console.log('Stream processing completed.');
resolve(records);
});
stream.pipe(parser);
});
}
function duplicateStream(originalStream) {
let stream1 = new PassThrough();
let stream2 = new PassThrough();
originalStream.pipe(stream1);
originalStream.pipe(stream2);
return [stream1, stream2];
}
Th problem is when an additional method is added that uses the returned value from the aforementioned method as a parameter, console.log("records", records)
becomes empty. The items
is records
.
async function batchWriteItems(items, tableName) {
const batchSize = 25;
const limit = pLimit(1);
const batches = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
const promises = batches.map(batch => limit(async () => {
const params = {
RequestItems: {
[tableName]: batch.map(item => ({
PutRequest: {
Item: item
}
}))
}
};
try {
const command = new BatchWriteItemCommand(params);
await dynamoDocClient.send(command);
} catch (error) {
console.error("Error sending batch write command:", error);
throw error;
}
}));
await Promise.all(promises);
}
Removing processRecordCount
and only using the two methods works:
const stream = data.Body;
const records = await processEntireRecords(stream);
await batchWriteItems(records, tableName);
luminaries is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.