I want to apply a schema to specific non-technical columns of a spark data frame. Beforehand I add an artificial id using Window
and row_number
so that I can later join some other technical columns to the new data frame. However, after applying the schema the generated ID is messed up. Below is the a code sample.
Can someone explain why this happens and how to resolve the issue?
I want to apply a schema to specific non-technical columns of a Spark DataFrame. Beforehand, I add an artificial ID using Window
and row_number
so that I can later join some other technical columns to the new DataFrame. However, after applying the schema, the generated ID is messed up. Below is a code sample. Can someone explain why this happens and how to resolve the issue?
from pyspark.sql.functions import row_number, lit, col, monotonically_increasing_id, sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Sample DataFrame
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])
# Schema to apply
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
])
# Create ID column
w = Window().orderBy(lit('A'))
df = df.withColumn('_special_surrogate_id', row_number().over(w))
# Improved method
surrogate_key_field = StructField("_special_surrogate_id", StringType(), False)
schema_with_surrogate = StructType(schema.fields + [surrogate_key_field])
for i in range(11):
df_filtered = df.select("id", "name", "_special_surrogate_id")
df_filtered = spark.createDataFrame(df_filtered.rdd, schema_with_surrogate)
combined_df = df.withColumnRenamed("id", "id1").join(df_filtered.withColumnRenamed("id", "id2"), on="_special_surrogate_id")
print("Diffs in Iteration " + str(i) + ":")
print(combined_df.withColumn("diff", (col("id1") != col("id2")).cast("integer")).agg(sum("diff")).collect()[0][0])