Python version: 3.11.8
Goal: Read 1.000 parquet files from Amazon bucket faster.
Approach: I am using the libraries boto3 and pandas to read the parquet files. I could retrieve the files sequentially. However, I want to reduce the time reading the parquet files by using a parallel execution with the library concurrent.futures. For that reason, I decided to implement the ProcessPoolExecutor, but I got stuck because I do not know how to create my project to implement it. This is my current attempt:
I have two scripts. The first script will read the parquet files and the second script will transform or manipulate all the received data.
script1.py
from boto3 import Session
from pandas import DataFrame,concat, read_parquet
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
def retrieve_s3_bucket(bucket_name: str,
access_key_id: str,
secret_access_key: str,
prefix: str):
### some code to retrieve objects from S3
return objects
def read_parquet_file(object) -> DataFrame:
buffer = BytesIO()
object.Object().download_fileobj(buffer)
df_object = read_parquet(buffer)
return df_object
def get_data_from_bucket(objects,
columns_to_delete: list,
columns_to_format: list,
restaurant_ids: list) -> DataFrame:
list_dfs = []
with ProcessPoolExecutor(2) as exe:
# submit tasks and collect futures
for object in objects:
df_object = exe.submit(read_parquet_file(object),object)
if df_object.empty:
print("Empty")
else:
list_dfs.append(df_object)
# wait for all tasks to complete
print('Waiting for tasks to complete...')
wait(list_dfs)
data = concat(list_dfs,ignore_index=True)
return data
# protect the entry point
if __name__ == '__main__':
# start the process pool
get_data_from_bucket()
script2.py:
from script1 import get_data_from_bucket, retrieve_s3_bucket
def main(bucket_name: str,
access_key_id: str,
secret_access_key: str,
prefix: str) -> str:
try:
data ={}
objects = retrieve_s3_bucket(bucket_name, access_key_id,secret_access_key,prefix)
df = get_data_from_bucket(objects,
columns_to_delete,
columns_to_format,
list_restaurants)
except Exception as e:
logging.error(str(e))
data['error_message'] = str(e)
data['stack_trace'] = print_exc()
finally:
return dumps(data)
I am getting the following error during execution:
An attempt has been made to start a new process before the current
process has finished its bootstrapping phase
How should I include the concurrent.futures in my project and to handle properly erros during the reading?. Forgive me if I am doing something wrong but I am pretty new with concept of parallelism.