I have a use case to remove duplicate records based on a key from data with and whenever duplicate is observed pick the record and remaining records with max date should be pushed to a different dataframe and write these two different dataframes back to gcs.
The data is present in gcs has 1 billion rows in parque format with 1000 part files each file size is ~84 MB and about data profiling I have checked that every record is unique on key column for this data but I cannot bypass the deduplication logic for future safety.
Below is the code i used.
df_rank=df.withColumn("rank", rank().over(window_specs))
df_rank.cache()
valid_df=df_rank.filter("rank = 1")
invalid_df=df_rank.filter("rank > 1")
The configuration i am using is below.
driver core: 4,driver memory: 8g
executor core: 4, executor memory: 16g, minExecutor: 16, maxExecutor: 40, spark.sql.shuffle.partitions: 200
The job is running forever. Could someone please suggest any optimisation techniques I should follow?