So the scenario is I have multiple gzipped JSON (JSON each row) files in GCS, which is roughly 200 GB compressed and 1.5 TB decompressed. I have a basic pipeline to try & see. The pipeline ;
- Reads the data from GCS
- Loads each row as Python dict
- Makes a transformation
I have the following code :
class TransformData(beam.DoFn):
def process(self, element):
mandatory_columns = {'title', 'bio', 'id'}
for row in element.get('extra_info', {}).get('info', {}):
yield {'title': element['title'], 'bio': str(element['bio']), 'id': element['id'],
'extra_details': json.dumps(
{key: value for key, value in row.items() if key not in mandatory_columns})
}
options = PipelineOptions()
options.view_as(WorkerOptions).num_workers = 10
options.view_as(WorkerOptions).max_num_workers = 32
options.view_as(WorkerOptions).machine_type = 'n1-standard-16'
p = beam.Pipeline(options=options)
lines = (p | "ReadFromGCS" >> beam.io.ReadFromText(input_files, compression_type="gzip"))
parsed_data = lines | "ParseJSON" >> beam.Map(lambda x: json.loads(x))
transformed_data = parsed_data | "TransformData" >> beam.ParDo(TransformData())
So the thing is this takes around 4 hours to complete, it is executed with a single worker only and after 2 hours of execution the step process gets to %100 but keeps working and throughput & cpu utilization decreases dramatically.
I have tried shuffling the records to be able to parallelise it but then shuffling takes long with a single worker and even though target workers are increased to 23, current worker stays at 1.
SDK version : 2.56.0
Thanks in the advance