I am trying to repartiton before applying any transformation logic. This takes a lot of time. Here is code and snapshot of UI below. Any optimization can be applied here?.
Cluster: AWS EMR,200 Task Node r6g.16xlarge 64 vCore, 488 GiB memory
df=spark.sql('''
with basedata as (
select kgs,marg_id,*******price
from searcced_v2
where is_ot = 0
and pardate between "20240602" and "20240608"
and rg in ('gg','MXg','gCA') )
select * from basedata ''').repartition(24000,'kgs','marg_id')
df.createOrReplaceTempView('vw_sewords')
spark-submit --conf spark.sql.files.maxPartitionBytes=268435456
--master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=1
--conf spark.sql.adaptive.enabled=true --conf spark.dynamicAllocation.enabled=false
--conf spark.sql.parquet.filterPushdown=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456
--conf spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled=true
--conf spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor=.5
--conf spark.sql.adaptive.coalescePartitions.parallelismFirst=false
--conf spark.sql.adaptive.coalescePartitions.initialPartitionNum=24000
--conf spark.sql.adaptive.localShuffleReader.enabled=true
--conf spark.shuffle.io.connectionTimeout=8000
--conf spark.network.timeout=50000s --conf spark.files.fetchTimeout=600s
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryoserializer.buffer.max=1g
--conf spark.memory.storageFraction=0.05 --conf spark.memory.fraction=.8
--conf spark.shuffle.compress=true --conf spark.shuffle.spill.compress=true
--conf spark.hadoop.fs.s3.multipart.th.fraction.parts.completed=0.99
--conf spark.sql.objectHashAggregate.sortBased.fallbackThreshold=4000000
--conf spark.reducer.maxReqsInFlight=100
--conf spark.shuffle.io.retryWait=60s
--conf spark.shuffle.io.maxRetries=10
--conf spark.reducer.maxSizeInFlight=1024m
--conf spark.shuffle.file.buffer=1024k
--conf spark.reducer.maxBlocksInFlightPerAddress=100
--conf spark.io.compression.codec=zstd
--conf spark.shuffle.service.enabled=true
--conf spark.io.compression.zstd.level=3
--conf spark.executor.cores=5
--conf spark.executor.instances=2400
--conf spark.executor.memory=34g --conf spark.driver.memory=60g --conf spark.executor.memoryOverhead=5g --conf spark.driver.memoryOverhead=4g
--conf spark.hadoop.fs.s3a.fast.output.enabled=true
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -Djavax.net.ssl.trustStore=/home/hadoop/.config/certs/InternalAndExternalTrustStore.jks" --conf spark.driver.extraJavaOptions="-XX:+UseG1GC "
test.py
Here is the UI details.