I have lambda on aws that fetches data from opensearch and saves it on s3 bucket data most of the time with small data, but I have users with big data, and then the lambda fails. Is it good to fetch the data with pagination and upload it to the S3 bucket using stream? Or would you happen to have another idea of how to resolve it?
here is example of code
<code>const AWS = require('aws-sdk');
const { Client } = require('@elastic/elasticsearch');
const { PassThrough } = require('stream');
const { promisify } = require('util');
const pipeline = promisify(require('stream').pipeline);
const s3 = new AWS.S3();
const opensearch = new Client({ node: 'https://your-opensearch-endpoint' });
const BUCKET_NAME = 'your-bucket-name';
const FILE_KEY = 'path/to/your/file.json';
const PAGE_SIZE = 1000;
async function fetchAndUploadData(index, query) {
const passThrough = new PassThrough();
const uploadParams = {
Bucket: BUCKET_NAME,
Key: FILE_KEY,
Body: passThrough,
ContentType: 'application/json'
};
const uploadPromise = s3.upload(uploadParams).promise();
passThrough.write('[');
let isFirst = true;
let scrollId = null;
try {
const initialResponse = await opensearch.search({
index: index,
body: query,
scroll: '1m',
size: PAGE_SIZE
});
scrollId = initialResponse.body._scroll_id;
await processHits(initialResponse.body.hits.hits);
while (true) {
const scrollResponse = await opensearch.scroll({
scroll_id: scrollId,
scroll: '1m'
});
if (scrollResponse.body.hits.hits.length === 0) {
break;
}
await processHits(scrollResponse.body.hits.hits);
}
// סגירת הזרם
passThrough.end(']');
// השלמת ההעלאה ל-S3
await uploadPromise;
console.log('Data uploaded successfully to S3');
} catch (error) {
console.error('Error fetching or uploading data:', error);
passThrough.end(']');
await uploadPromise;
}
async function processHits(hits) {
const data = hits.map(hit => hit._source);
if (data.length > 0) {
if (!isFirst) {
passThrough.write(',');
} else {
isFirst = false;
}
passThrough.write(JSON.stringify(data).slice(1, -1));
}
}
}
fetchAndUploadData('your-index', {
query: {
match_all: {}
}
}).catch(console.error);
</code>
<code>const AWS = require('aws-sdk');
const { Client } = require('@elastic/elasticsearch');
const { PassThrough } = require('stream');
const { promisify } = require('util');
const pipeline = promisify(require('stream').pipeline);
const s3 = new AWS.S3();
const opensearch = new Client({ node: 'https://your-opensearch-endpoint' });
const BUCKET_NAME = 'your-bucket-name';
const FILE_KEY = 'path/to/your/file.json';
const PAGE_SIZE = 1000;
async function fetchAndUploadData(index, query) {
const passThrough = new PassThrough();
const uploadParams = {
Bucket: BUCKET_NAME,
Key: FILE_KEY,
Body: passThrough,
ContentType: 'application/json'
};
const uploadPromise = s3.upload(uploadParams).promise();
passThrough.write('[');
let isFirst = true;
let scrollId = null;
try {
const initialResponse = await opensearch.search({
index: index,
body: query,
scroll: '1m',
size: PAGE_SIZE
});
scrollId = initialResponse.body._scroll_id;
await processHits(initialResponse.body.hits.hits);
while (true) {
const scrollResponse = await opensearch.scroll({
scroll_id: scrollId,
scroll: '1m'
});
if (scrollResponse.body.hits.hits.length === 0) {
break;
}
await processHits(scrollResponse.body.hits.hits);
}
// סגירת הזרם
passThrough.end(']');
// השלמת ההעלאה ל-S3
await uploadPromise;
console.log('Data uploaded successfully to S3');
} catch (error) {
console.error('Error fetching or uploading data:', error);
passThrough.end(']');
await uploadPromise;
}
async function processHits(hits) {
const data = hits.map(hit => hit._source);
if (data.length > 0) {
if (!isFirst) {
passThrough.write(',');
} else {
isFirst = false;
}
passThrough.write(JSON.stringify(data).slice(1, -1));
}
}
}
fetchAndUploadData('your-index', {
query: {
match_all: {}
}
}).catch(console.error);
</code>
const AWS = require('aws-sdk');
const { Client } = require('@elastic/elasticsearch');
const { PassThrough } = require('stream');
const { promisify } = require('util');
const pipeline = promisify(require('stream').pipeline);
const s3 = new AWS.S3();
const opensearch = new Client({ node: 'https://your-opensearch-endpoint' });
const BUCKET_NAME = 'your-bucket-name';
const FILE_KEY = 'path/to/your/file.json';
const PAGE_SIZE = 1000;
async function fetchAndUploadData(index, query) {
const passThrough = new PassThrough();
const uploadParams = {
Bucket: BUCKET_NAME,
Key: FILE_KEY,
Body: passThrough,
ContentType: 'application/json'
};
const uploadPromise = s3.upload(uploadParams).promise();
passThrough.write('[');
let isFirst = true;
let scrollId = null;
try {
const initialResponse = await opensearch.search({
index: index,
body: query,
scroll: '1m',
size: PAGE_SIZE
});
scrollId = initialResponse.body._scroll_id;
await processHits(initialResponse.body.hits.hits);
while (true) {
const scrollResponse = await opensearch.scroll({
scroll_id: scrollId,
scroll: '1m'
});
if (scrollResponse.body.hits.hits.length === 0) {
break;
}
await processHits(scrollResponse.body.hits.hits);
}
// סגירת הזרם
passThrough.end(']');
// השלמת ההעלאה ל-S3
await uploadPromise;
console.log('Data uploaded successfully to S3');
} catch (error) {
console.error('Error fetching or uploading data:', error);
passThrough.end(']');
await uploadPromise;
}
async function processHits(hits) {
const data = hits.map(hit => hit._source);
if (data.length > 0) {
if (!isFirst) {
passThrough.write(',');
} else {
isFirst = false;
}
passThrough.write(JSON.stringify(data).slice(1, -1));
}
}
}
fetchAndUploadData('your-index', {
query: {
match_all: {}
}
}).catch(console.error);