i have following code, which i use to add unique id to my DF. DF can have complete duplicate rows (all columns same), so i cannot use SHA or any other algo.
I am using ADB, DBR 14.3, spark 3.5.0
<code>def create_one_partition (df: DataFrame = None):
if df:
if df.rdd.getNumPartitions() > 1:
df = df.coalesce(1)
return df
def add_consecutive_unique_id_column(df: DataFrame, unique_id_col_name: str = "unique_id"):
"""
Add a consecutive unique identifier column to a DataFrame.
Args:
- df: DataFrame to which the unique identifier column will be added
- unique_id_col_name: Name of the column for the unique identifier (default: "unique_id")
Returns:
- DataFrame: DataFrame with a consecutive unique identifier column
"""
# If the temporary unique_id_tmp column doesn't exist, create it
if "unique_id_tmp" not in df.columns:
df = create_one_partition(df)
df = df.withColumn("unique_id_tmp", monotonically_increasing_id())
# Use row_number to generate consecutive unique IDs based on the temporary unique_id_tmp column
window_spec = Window.orderBy("unique_id_tmp")
df = df.withColumn(unique_id_col_name, row_number().over(window_spec))
# Drop the original unique_id_tmp column as it's no longer needed
df = df.drop("unique_id_tmp")
##return df
return df
</code>
<code>def create_one_partition (df: DataFrame = None):
if df:
if df.rdd.getNumPartitions() > 1:
df = df.coalesce(1)
return df
def add_consecutive_unique_id_column(df: DataFrame, unique_id_col_name: str = "unique_id"):
"""
Add a consecutive unique identifier column to a DataFrame.
Args:
- df: DataFrame to which the unique identifier column will be added
- unique_id_col_name: Name of the column for the unique identifier (default: "unique_id")
Returns:
- DataFrame: DataFrame with a consecutive unique identifier column
"""
# If the temporary unique_id_tmp column doesn't exist, create it
if "unique_id_tmp" not in df.columns:
df = create_one_partition(df)
df = df.withColumn("unique_id_tmp", monotonically_increasing_id())
# Use row_number to generate consecutive unique IDs based on the temporary unique_id_tmp column
window_spec = Window.orderBy("unique_id_tmp")
df = df.withColumn(unique_id_col_name, row_number().over(window_spec))
# Drop the original unique_id_tmp column as it's no longer needed
df = df.drop("unique_id_tmp")
##return df
return df
</code>
def create_one_partition (df: DataFrame = None):
if df:
if df.rdd.getNumPartitions() > 1:
df = df.coalesce(1)
return df
def add_consecutive_unique_id_column(df: DataFrame, unique_id_col_name: str = "unique_id"):
"""
Add a consecutive unique identifier column to a DataFrame.
Args:
- df: DataFrame to which the unique identifier column will be added
- unique_id_col_name: Name of the column for the unique identifier (default: "unique_id")
Returns:
- DataFrame: DataFrame with a consecutive unique identifier column
"""
# If the temporary unique_id_tmp column doesn't exist, create it
if "unique_id_tmp" not in df.columns:
df = create_one_partition(df)
df = df.withColumn("unique_id_tmp", monotonically_increasing_id())
# Use row_number to generate consecutive unique IDs based on the temporary unique_id_tmp column
window_spec = Window.orderBy("unique_id_tmp")
df = df.withColumn(unique_id_col_name, row_number().over(window_spec))
# Drop the original unique_id_tmp column as it's no longer needed
df = df.drop("unique_id_tmp")
##return df
return df
but issue with this code is, when i try to use resultant DF in my code for any purpose. the value this column keeps changing. It is behaving as if spark is not generating values, but keeping this code as view, making this code completely useless.
any idea? to get around this? so DF doesnt keep generating this column value again and again.