We have a fleet of about 2000 IoT sensors (this number is increasing rapidly) whose data we need to ingest into an AWS S3 bucket for further analysis and display in the UI. The sensors are used by multiple clients, so the data is segregated in the S3 bucket by client ID. Each sensor sends about 60 packets of data, and each data packet consists of some configuration details of the sensor and an array of 1000 values of acceleration data. These 60 packets need to be stitched together and converted to a single file in JSON to represent a single measurement of a duration of 5 seconds.
Our current ingestion strategy involves transmitting each packet to AWS IoT Core via MQTT. Each packet is then fetched by an AWS Lambda (triggered by the MQTT message) and PUT into an AWS S3 bucket. Once 60 such packets have arrived (tracked via an AWS DynamoDB table), another AWS Lambda function picks up all these packets from S3, stitches them together, and stores the resulting file back in S3. However, as we scale up, the S3 PUT costs are skyrocketing since we are writing 60 + 1 files to S3 when only a single file should be written.
To lower these costs, we are considering alternative ingestion strategies, such as using AWS Kinesis Data Stream as a temporary buffer. However, I’m unsure how to read this buffer in a Lambda and segregate it for different sensors. For example, let’s say a sensor has written 60 data records into the Kinesis data stream, but another sensor has written only 40 packets, and my Lambda polls a batch of 100 to perform the aggregation and stitching before putting it into S3. How do I make sure that all the packets have been received before triggering a Lambda to read data from the AWS Kinesis Data Stream or are there any other ingestion strategies that I can incorporate?