I am trying to make some joints, groupings,… more efficiently with pyspark, by trying to avoid unnecessary exchanges. I have a situation where first I need to join a dataframe by columns (a, b, c), and later another join by columns (a, b, d).
I want to bucket by dataframe by the first 2 columns, so that an exchange is not necessary, but spark always force the exchange by the 3 columns before each join. How can I avoid this extra exchanges?
There seems to be some PR to get this feature, but they were not merged:
https://issues.apache.org/jira/browse/SPARK-18067
https://github.com/apache/spark/pull/29655
Related question: How can I convince spark not to make an exchange when the join key is a super-set of the bucketBy key?
At the moment, the only workaround I have seen, that more or less works, is to perform the join with the >= and <= hack. However, this does not serve for groupby operations and joints altogether. It would be ideal to use some kind of hint to tell pyspark that it is safe to join/group with the current bucketing, and that I guarantee that data to be merged/grouped lives within the same node.