I’m using AWS Glue Spark/python jobs to ingest data into hudi tables in a s3 bucket. I’m hitting major s3 slowdown issues, in a way that goes beyond reasonable, but unable to pin down the root cause.
My destination s3 bucket has a folder for every hudi table. When I ingest data to the tables sequentially (one table at a time – a very long run), all works well. However, when I run multiple jobs in parallel, each processing a single table, many of the jobs are failing with s3 slowdown errors.
This is not the expected behaviour, as the s3 rate limits are per prefix. Seeing as each hudi table has it’s own folder, I assumed that each of those folders are a separate prefix, and should not be impacted by what’s happening in other folders/prefixes. However, jobs do hit rate limits when running in parallel, unlike when running sequentially.
This is my hudi configuration:
config = {
'className' : 'org.apache.hudi',
'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.precombine.field': 'tx_commit_time',
'path': targetPath, # s3 uri of the hudi table folder
'hoodie.table.name': tableName,
'hoodie.datasource.hive_sync.database': dbName,
'hoodie.datasource.hive_sync.table': tableName,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.recordkey.field': hudiPrimaryKey,
'hoodie.datasource.write.partitionpath.field': partitionKeyField,
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields': partitionKeyField,
'hoodie.upsert.shuffle.parallelism': 20,
'hoodie.insert.shuffle.parallelism': 20,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS',
'hoodie.cleaner.fileversions.retained': 1,
'hoodie.datasource.write.reconcile.schema': 'true',
'hoodie.index.type': 'GLOBAL_SIMPLE'
}
My write operation is fairly straight forward:
outputDf.write.format('hudi').options(**config).mode('Append').save()
Can anyone please help clarify this behaviour and how to avoid it?