I’m struggling with a data engineering problem:
Dataset Characteristics
public dataset | duration | Total #VMs | Dataset size |
---|---|---|---|
AzurePublicDatasetV2 | 30 consecutive days | 2,695,548 (~2.6M ) | 235GB (156GB compressed) [198 files] |
I need to read 195 gzip
files and filter some interested vmid
records from time-series data. Due to the fact that the quality of data collection (the early time records for all vmid
s are on early files and the latest records are stored in the last files), I need to read all files and filter based on the interested vmid_lists
. The problem I’m using Google Colab medium for my analysis and it has limited computation resources it crashes if naively I open each file and decompress and concat the dataframes using pandas. This approach needs further resources but currently, I do not have access.
To the best of my knowledge, there are new packages for readload bigdata (except for Spark):
- dask
- polars
Let’s say I want to efficiently collect and filter records of limited vmid
s (e.g. 4 out of ~2.6M) that exist in the list of-course the best is:
- to select those that have 30 days data continuously or
- to collect all
vmid
data individually and store them accordingly [e.g parquet files ]
What I have tried so far is below which leads to the Out-of-Memory (OOM) error or crashes notebook:
import dask.dataframe as dd
dfs = []
for i in range(95,115):
print(i)
df = dd.read_csv(f"https://azurecloudpublicdataset2.z19.web.core.windows.net/azurepublicdatasetv2/trace_data/vm_cpu_readings/vm_cpu_readings-file-{i}-of-195.csv.gz", blocksize=None)
df.columns = ['timestamp', 'vmid', 'mincpu', 'maxcpu', 'avgcpu']
dfs.append(df)
# Combine dataframes
combined_df = dd.concat(dfs)
# Select top 20 vmid
#top_vmid_counts = combined_df["vmid"].value_counts().head(20) # when this not feasible
# How to select continuous time data for selected vmids ---> vmid_lists = ...
vmid_lists = ["yNf/R3X8fyXkOJm3ihXQcT0F52a8cDWPPRzTT6QFW8N+1QPfeKR5//6xyX0VYn7X", #30 days data continuously
"4WstS6Ub3GzHun4Mzb6BxLldKvkEkws2SZ9tbBV3kfLzOd+QRVETcgqLjtc3mCbD",
"5f2jDjOhz6v00WonXOAuZW0uPO4OXjf5t64xYvOefcKwb4v7mOQtOZEVebAbiQq7",
"E3fjqJ4h2SLfvLl9EV6/w9uc8osF0dw9dENCHteoNRLZTp500ezV9RPfyeMdOKfu",
]
top_vmid = combined_df[combined_df["vmid"].isin(vmid_lists)]
# Compute the result
result=top_vmid.compute()
I think considering the situation I explained, the proposed solution should have:
- read all
csv.gz
files 1 by 1 till the last file - filter (if you use Pyspark better to cash the table by
.cashe()
) - concat dataframe(s)
- store under the name of each vmids [e.g
.parquet
or.csv
] - delete the
csv.gz
to avoid crashing
So what is the best practice to efficiently do this task during readload phase to retrieve easily interested records?
Any help will be highly appreciated
Colab notebook if one is interested to experiment:
Potentially related posts I have found:
- More efficient way to select partly records from a big file in Python
- How can I efficiently filter a PySpark data frame with conditions listed in the dictionary?
- Efficient storage and retrieval for heirarichal data in python
- how filter data where there is just one word
- Make piece of code efficient for big data