I have this code snippet to get the minimum distances of each user to all centroids:
@pandas_udf(DoubleType())
def calculate_euclidean_distances(s1: pd.Series, s2: pd.Series) -> pd.Series:
df1 = pd.DataFrame(s1.tolist())
df2 = pd.DataFrame(s2.tolist())
return np.sqrt(((df1 - df2) ** 2).sum(axis=1))
distances_df = distances_df.withColumn(
"distance",
calculate_euclidean_distances(col("ratings"), col("centroid-ratings")),
).drop("centroid-ratings")
min_distances_df = distances_df.groupBy("user").agg(
spark_min("distance").alias("min_distance")
)
user_ratings_df = distances_df.join(
min_distances_df,
on=[
distances_df.user == min_distances_df.user,
distances_df.distance == min_distances_df.min_distance,
],
)
user_ratings_df.show()
The code runs successfully but I get this strange warning when it handles the join operation:
24/06/12 11:13:07 WARN ExtractPythonUDFFromJoinCondition: The join condition:isnotnull(calculate_euclidean_distances(ratings#177, centroid-ratings#5024)#5664) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
Firstly, I can’t understand why the join operation runs the calculate_euclidean_distances UDF again when I have already run it before to get the distance column. Secondly, I apparently use pandas UDF to create the distance column but I don’t know why it says that the UDF contains PythonUDF only.
When I remove the “distances_df.distance == min_distances_df.min_distance” condition from the on clause, the code runs without any warnings, but does this make my code meaningless.