I try to identify which fields of my final data frame origins from which fields in the input data frames
E.g. having the following transformation
df_a = spark.createDataFrame([('1', 'A', "alpha"), ('2', 'A', "alpha")], schema=["id", "value", "part_1"])
df_b = spark.createDataFrame([('1', 10, "beta"), ('2', 20, "beta")], schema=["id", "value_2", "part_2"])
df_c = df_a.join(df_b, how='left', on='id').filter(f.col("value_2") > 15)
df_c = df_c.withColumn("concat_variable", f.concat(f.col("part_1"), f.lit("_"), f.col("part_2")))
df_c = df_c.withColumnRenamed(existing="value", new="value_renamed")
My desired result here would be for the fields in df_c
id: df_a.id
value_renamed: df_a.value,
value_2: df_b.value_2,
concat_variable: (df_a.part_1, df_b.part_2)
I already found the Query Execution plan which contains all the information needed for that but I cannot figure out how to compress it into the desired format:
print(df_c._jdf.queryExecution().toString())
== Parsed Logical Plan ==
Project [id#0, value#1 AS value_renamed#24, part_1#2, value_2#7L, part_2#8, concat_variable#17]
+- Project [id#0, value#1, part_1#2, value_2#7L, part_2#8, concat(part_1#2, _, part_2#8) AS concat_variable#17]
+- Filter (value_2#7L > cast(15 as bigint))
+- Project [id#0, value#1, part_1#2, value_2#7L, part_2#8]
+- Join LeftOuter, (id#0 = id#6)
:- LogicalRDD [id#0, value#1, part_1#2], false
+- LogicalRDD [id#6, value_2#7L, part_2#8], false
== Analyzed Logical Plan ==
id: string, value_renamed: string, part_1: string, value_2: bigint, part_2: string, concat_variable: string
Project [id#0, value#1 AS value_renamed#24, part_1#2, value_2#7L, part_2#8, concat_variable#17]
+- Project [id#0, value#1, part_1#2, value_2#7L, part_2#8, concat(part_1#2, _, part_2#8) AS concat_variable#17]
+- Filter (value_2#7L > cast(15 as bigint))
+- Project [id#0, value#1, part_1#2, value_2#7L, part_2#8]
+- Join LeftOuter, (id#0 = id#6)
:- LogicalRDD [id#0, value#1, part_1#2], false
+- LogicalRDD [id#6, value_2#7L, part_2#8], false
== Optimized Logical Plan ==
Project [id#0, value#1 AS value_renamed#24, part_1#2, value_2#7L, part_2#8, concat(part_1#2, _, part_2#8) AS concat_variable#17]
+- Join Inner, (id#0 = id#6)
:- Filter isnotnull(id#0)
: +- LogicalRDD [id#0, value#1, part_1#2], false
+- Filter ((isnotnull(value_2#7L) AND (value_2#7L > 15)) AND isnotnull(id#6))
+- LogicalRDD [id#6, value_2#7L, part_2#8], false
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#0, value#1 AS value_renamed#24, part_1#2, value_2#7L, part_2#8, concat(part_1#2, _, part_2#8) AS concat_variable#17]
+- SortMergeJoin [id#0], [id#6], Inner
:- Sort [id#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0, 200), ENSURE_REQUIREMENTS, [plan_id=33]
: +- Filter isnotnull(id#0)
: +- Scan ExistingRDD[id#0,value#1,part_1#2]
+- Sort [id#6 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#6, 200), ENSURE_REQUIREMENTS, [plan_id=34]
+- Filter ((isnotnull(value_2#7L) AND (value_2#7L > 15)) AND isnotnull(id#6))
+- Scan ExistingRDD[id#6,value_2#7L,part_2#8]