I have a series of parquet files which have been landed inside of an S3 bucket; these files drastically range in size from a few KBs to a few GBs. The data contained within these files are however structured according to the same schema, so that I have been able run a Glue Crawler pointed at the S3 bucket to create a single table inside of a Glue Database containing all records included in the delivered files.
I am now looking to repartition the table created by the Glue Crawler so that each partition is of a maximum given file size (at the moment this is 10MB), before outputting the new partitions to a directory in my S3 bucket as a set of CSV files; essentially writing the table to standardised “chunks”.
I have tried this a couple of ways as suggested by the Glue documentation, but both seem to give issues:
- The first, I used the metadata of the S3 bucket containing the original files to workout how many output files would be required so that the table can be evenly repartitioned so that each partition is less that 10MB, and then writing the partitioned DynamicFrame to S3. This was done by the follow script (neglecting the standard AWS Glue Script preamble):
S3_MEMORY_SIZE = 2e10
OUTFILE_SIZE = 1e7
# Define transformation function
def partititionTransform(glueContext, dynamic_frame, num) -> DynamicFrame:
# convert to pyspark dataframe so we can specify the number of output files partitions
data_frame = dynamic_frame.toDF().repartition(num)
# Convert to AWS Glue dynamic frames
dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "dynamic_frame")
return dynamic_frame
# Fetch parquet from S3 bucket & convert to a AWS Glue dynamic frame
glue_table = glueContext.create_dynamic_frame_from_catalog(
database='glue-test-db',
table_name='parquet',
)
# number of required output files
num_files = int(table_size / OUTFILE_SIZE) + 1
# Implement Transforms. Split & Convert input file
out_glue_table = partitionTransform(glueContext, glue_table, num_files)
# Upload CSV files to target s3 bucket with a new folder
glue_table = glueContext.write_dynamic_frame.from_options(
frame=out_glue_table,
connection_type="s3",
format="csv",
format_options={"writeHeader": True},
connection_options={"path": "s3://exmaple-bucket/cleaned-data/"})
job.commit()
On running however, Glue throws the following error:
Error Category: UNCLASSIFIED_ERROR; Py4JError: An error occurred while calling o131.repartition. Trace:
This disappears and works as expected if instead of partitioning the table into num
partitions, I hard code it to partition into 2. I have thus interpreted this error as suggesting that I am requesting too many partitions.
- The second way I have tried ditches the partitioning function and instead attempts to make use of job partitioning, setting the additional
boundedSize
parameter to be'10000000'
inside of thecreate_dynamic_frame_from_catalog
method and then just writing (what I assume to be the partitioned) the files to the S3 location:
glue_table = glueContext.create_dynamic_frame_from_catalog(
database='glue-test-db',
table_name='parquet',
additional_options = {"boundedSize": "10000000"},
)
glue_table = glueContext.write_dynamic_frame.from_options(
frame=glue_table,
connection_type="s3",
format="csv",
format_options={"writeHeader": True},
connection_options={"path": "s3://example-bucket/cleaned-data/"})
While this does not throw an error upon running and the job actually succeeds, looking at the destination S3 directory shows that multiple files have been written but they are either 131MB or 180MB. There is also about 20 files of a few hundreds bytes, which upon inspection just contain the tables header in CSV format.
The AWS documentation does not seem to be overly clear and I have found no indiciation of why either of these solutions appear to be failing; although multiple sources have suggested both of these methods as a solution breaking a larger table into a number of files of a maximum size.
Any suggestion as to why or where these issues may be arising from would be appreciated.
This is currently just an experiment to trial of the use of Glue capabilities, which if adopted would be used to process data totaling a few hundred GBs. I therefore realise that in production, reading the full table into memory and attempting to convert it to a spark dataframe to perform the .repartition
method (as in option 1) may not be viable due to insufficient executor memory. So any suggestion which also takes account of this potential future hurdle would be even more appreciated.
zingerrr is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.