Notebook Task: The task involves reading data from multiple tables into various dataframes using JDBC, performing 19 joins with filters and transformations, and then writing the final dataframe into a table. I recognize that this approach has its issues, particularly due to the multiple shuffles (joins) before writing to the table. However, my focus is on selecting an appropriate cluster configuration rather than optimizing the code.
Spark UI Details to Analyze the Problem:
- Shuffle Read Data: 75 to 89 GB
- Shuffle Write Data: 75 to 89 GB
- Input Data: 2 GB
To handle this, I provisioned an i3.8xlarge
cluster with 256 GB of memory and 32 cores, using 1 to 6 worker nodes and a driver node identical to the workers.
Issue: The job usually completes within 1.5 to 2 hours, but on some days, it takes significantly longer—between 3 to 5 hours. After analyzing, I found that AWS Spot instances were failing on the days with extended run times, causing heavy disk spill. I switched from Spot instances to On-Demand for 4 workers (previously, only 1 was used).
Disk Spill Problem:
The spill issue persists even on regular days. I dug deeper and realized that the high shuffle data requires more executor memory than storage, indicating a memory-intensive workload. Consequently, I switched to r5d.8xlarge
(a memory-optimized worker type).
Confusion:
Even after switching to a memory-optimized cluster, the execution time remained the same, and disk spills continued. I tried the following configurations:
- spark.executor.memoryOverhead: 16g
- spark.sql.adaptive.enabled: true
- spark.memory.offHeap.enabled: true
- spark.memory.offHeap.size: 40g
- spark.executor.memory: 80g
- spark.sql.shuffle.partitions: 3000
With these settings, the job ran in 53 minutes, reducing the time significantly, but disk spills still occurred, suggesting further optimization is possible.
I enabled Adaptive Query Execution (AQE) and set spark.sql.autoBroadcastJoinThreshold
to 200 MB, but there was no noticeable improvement. After calculating further, I used these settings:
- spark.databricks.adaptive.autoBroadcastJoinThreshold: 209715200
- spark.sql.autoBroadcastJoinThreshold: 209715200
These changes reduced shuffle read/write to about 59 GB, but the job took 1 hour and 10 minutes—longer than the 53 minutes it previously took, despite the reduced shuffle.
Further Configurations Tried:
I added the following configurations:
- spark.serializer: org.apache.spark.serializer.KryoSerializer (since DBR 15.3 Contains spark 3.5.0 this is not applicable )
- spark.memory.storageFraction: 0.1
- spark.memory.fraction: 0.7
- spark.shuffle.memoryFraction: 0.6 (to allocate more space for shuffle)
Despite reducing shuffle data to 50 GB, the job still took 1 hour and 1 minute. However, there were a few failed tasks.
The shuffle data has reduced, which is good, but I need help identifying the configuration that’s missing, as the job still takes longer than expected. Could anyone assist in understanding and optimizing these settings further?
Thanks for reading!!
Tried to balance cluster configurations with given code and need help in understanding spark configurations
premika is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.