I am trying to better understand the behaviour of Spark’s window functions.
I find it very strange that there is no shuffle phase(exchange) if you repartition the dataframe on just one of the columns in the partitionBy clause.
I would expect it to do an exchange for the hash of all the columns in the stage, instead I just see a sort in the physical plan.
Here is an example – the meaning of the data doesn’t matter, just the fact that if I repartition on “subject”, it doesn’t need a shuffle. How does it know that all the combinations of name-subject are within 1 partition?
data = [
("Alice", "2021", "history", "2021-01-01", 100),
("Alice", "2021", "history", "2021-01-01", 150),
("Alice", "2022", "science", "2022-01-02", 200),
("Alice", "2022", "math", "2022-01-03", 150),
("Bob", "2021", "history", "2021-01-01", 80),
("Bob", "2021", "math", "2021-01-01", 200),
("Bob", "2022", "science", "2022-01-02", 150),
("Bob", "2022", "math", "2022-01-03", 200),
("Charlie", "2021", "science", "2021-01-01", 200),
("Charlie", "2021", "math", "2021-01-01", 300),
("Charlie", "2022", "math", "2022-01-02", 200),
("Charlie", "2021", "science", "2021-01-03", 300)
]
columns = ["name", "year", "subject", "date", "score"]
window_spec = Window.partitionBy("name", "subject").orderBy("score", "date")
df = spark.createDataFrame(data, columns)
df = df.repartition(3, "subject")
df = df.withColumn("dense_rank", f.dense_rank().over(window_spec))
df = df.where("dense_rank == 1")
df.show()
df.explain(True)
Physical plan:
AdaptiveSparkPlan isFinalPlan=false
+- Filter (dense_rank#24086 = 1)
+- Window [dense_rank(score#24078L, date#24077) windowspecdefinition(name#24074, subject#24076, score#24078L ASC NULLS FIRST, date#24077 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS dense_rank#24086], [name#24074, subject#24076], [score#24078L ASC NULLS FIRST, date#24077 ASC NULLS FIRST]
+- Sort [name#24074 ASC NULLS FIRST, subject#24076 ASC NULLS FIRST, score#24078L ASC NULLS FIRST, date#24077 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(subject#24076, 3), REPARTITION_BY_NUM, [plan_id=27145]
+- Scan ExistingRDD[name#24074,year#24075,subject#24076,date#24077,score#24078L]