I’m working with hierarchical data in PySpark where each employee has a manager, and I need to find all the inline managers for each employee. An inline manager is defined as the manager of the manager, and so on, until we reach the top-level manager (CEO) who does not have a manager.
from pyspark.sql.functions import udf, broadcast, col
from pyspark.sql.types import ArrayType, StringType
# Broadcast the DataFrame df
broadcast_df = broadcast(df)
# Define a function to find inline managers
def find_inline_managers(user_id, manager_id):
inline_managers = []
while manager_id is not None:
manager = broadcast_df.filter(col("user_id") == manager_id).select("username").first()[0]
inline_managers.append(f"{manager}_level_{len(inline_managers) + 1}")
manager_id = broadcast_df.filter(col("user_id") == manager_id).select("manager_id").first()[0]
return inline_managers
# Register the UDF
find_inline_managers_udf = udf(find_inline_managers, ArrayType(StringType()))
# Apply the UDF to create the new column
df = df.withColumn("inline_managers", find_inline_managers_udf("user_id", "manager_id"))
Here, I have developed the find_inline_managers udf to create a derived column “inline_mangers”, but I am getting following error message:
PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
How can we fix this?
If you know any alternative way to solve this problem, then let me know, thanks
Note: Recursive CTE is not supported in Databricks