I have a lambda function that reads in a json file and flattens it so that I can convert it to parquet and it’ll be easier to read the data. The issue I’m having is that some of the files are too large and the lambda function times out before it can flatten the file. I’ve tried to use a Glue Job with the exact same code as a workaround to the timeout but it takes hours to process just one file. So after doing some research, I found that reading the json in chuncks might help. I was able to read it successfully in chunks and convert to parquet but the issue is how can I flatten the json while reading it in chunks. Here is my code. I need to flatten the json in the first if statement where ‘largefile’ is in the fullstring. The Elif section is just flattening normal smaller json files. The else statement is just reading in the json file without flattening. Then I convert all datatypes to string and convert to parquet. Any help is appreciated.
if fullstring != None and 'largefile' in fullstring:
print('large files, processing these files in chunks...')
islargeFile = True
destination_par_folder_with_bucket='s3://'+str(processed_bucket)
destination_par_folder_with_partition = str(destination_par_folder_with_bucket) + 'flowstartdate=' + str(date_file_created) +'/'
Path=destination_par_folder_with_partition+filename+'.parquet'
for dfChunk in wr.s3.read_json(path=[s3_json_location], lines=True, orient='records', chunksize=25000):
dfChunk = dfChunk.astype(str)
try:
print('Writing the files to parquet...')
wr.s3.to_parquet(df=dfChunk, path=Path, dataset=True, mode='append', compression=None, use_threads=True)
print("Successfully converted to parquet!")
except Exception as e:
print('Error trying to convert to parquet...')
print('error -> ', e)
raise Exception(e)
elif fullstring != None and 'need_to_flatten' in fullstring:
print('Flatting nested JSON....')
islargeFile = False
response = s3.get_object(Bucket = source_bucket_name, Key = sourcekey)
ndf=pd.DataFrame()
data = (line.decode('utf-8') for line in response['Body'].iter_lines())
running_row = 1
for row in data:
jsonObject=json.loads(row)
flat = flatten_json(jsonObject)
Temp_df = pd.json_normalize(flat)
Temp_df.dropna(axis='columns')
Temp_df=Temp_df.dropna(axis='columns')
if running_row == 1:
ndf=pd.json_normalize(flat)
ndf=ndf.dropna(axis='columns')
running_row =running_row + 1
else:
ndf=pd.concat([ndf,Temp_df])
dfs = ndf
else:
islargeFile = False
dfs = wr.s3.read_json(path=[s3_json_location], lines=True, orient='records')
if islargeFile is False:
# converting all datatypes to string
dfs = dfs.astype(str)
print('Writing the files to parquet...')
try:
wr.s3.to_parquet(df=dfs, path=destination_par_folder_with_partition+filename+'.parquet')
print("Successfully converted to parquet!")
except Exception as e:
print('Error trying to convert to parquet...')
print('error -> ', e)
raise Exception(e)