A newbie here using parallel computing in Phyton
I have ~80 huge CSV files (32 GB each) that I need to process in Python to retrieve some rows from them.
The file structure is
'Barra', 'D1', 'D2','D3','D4','D5','D6','D7','D8','CMG'
One possibility is to translate all the csv to a database but I would like to explore the use of Dask to process it (if the processing time is over 20 min for all files probably I will switch to a database if it is faster).
I have the following code but doesn’t work 🙁
import dask.dataframe as dd
import pandas as pd
from datetime import datetime
from tkinter import filedialog as fd
from tkinter import ttk
filename1 = fd.askopenfilename(title="ARCHIVO CSV ENTRADA")
filename2 = fd.asksaveasfile(initialfile = 'Untitled.csv', defaultextension=".csv", title="ARCHIVO DE SALIDA")
dt1=datetime.now()
barra=['barra1','barra3','barra5','barra7','barra9','barra11','barra13','barra15','barra17','barra19']
ichunk=1
def Proceso(chunk_df,barra):
# Proceso de elecciòn
chunk_df.columns = ['ID', 'barra', 'D1', 'D2','D3','D4','D5','D6','D7','D8','CMG']
chunk1 = chunk_df[chunk_df['barra'].isin(barra)]
return chunk1
for chunk in pd.read_csv(filename1, chunksize=100000):
chunk_df = dd.from_pandas(chunk, npartitions=5)
chunk1_df = chunk_df.map_partitions(chunk_df,barra).compute()
chunk1.to_csv(filename2.name, index=False, mode='a', header=False)
print("pasada:", ichunk)
ichunk=ichunk+1
dt2=datetime.now()
print("FIN: ",dt2-dt1)
Dask leaves the following message:
AssertionError Traceback (most recent call last)
Cell In[12], line 21
19 for chunk in pd.read_csv(filename1, chunksize=100000):
20 chunk_df = dd.from_pandas(chunk, npartitions=5)
---> 21 chunk1_df = chunk_df.map_partitions(chunk_df,barra).compute()
22 chunk1.to_csv(filename2.name, index=False, mode='a', header=False)
23 print("pasada:", ichunk)
File C:ProgramDataanaconda3Libsite-packagesdaskdataframecore.py:1006, in _Frame.map_partitions(self, func, *args, **kwargs)
878 @insert_meta_param_description(pad=12)
879 def map_partitions(self, func, *args, **kwargs):
880 """Apply Python function on each DataFrame partition.
881
882 Note that the index and divisions are assumed to remain unchanged.
(...)
1004 None as the division.
1005 """
-> 1006 return map_partitions(func, self, *args, **kwargs)
File C:ProgramDataanaconda3Libsite-packagesdaskdataframecore.py:6924, in map_partitions(func, meta, enforce_metadata, transform_divisions, align_dataframes, *args, **kwargs)
6921 name = kwargs.pop("token", None)
6922 parent_meta = kwargs.pop("parent_meta", None)
-> 6924 assert callable(func)
6925 if name is not None:
6926 token = tokenize(meta, *args, **kwargs)
AssertionError:
At the moment without Dask the process time for one file is ~8 min and I need to reduce this to the minimum possible.
I’m using chunks to avoid memory problems (old structure without Dask). Can I take out this part when I use Dask?
Any ideas to solve the error?
Thanks!