I would like to process the data 24/7 uisng databricks strcutred streaming.
But before I process all the data I need to apply some transformation on it.
therefore Im using foreach
. Below is my def
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, from_json, regexp_replace
from datetime import datetime
def flatten_nested_df_2(df_nested, columns_to_flatten, epochId ):
"""
Cleans and flattens a nested DataFrame by extracting and parsing JSON from a specified column,
then iteratively creating new columns out of original columns which are structs.
The new column names get the original parent column name as prefix.
:param df_nested: The input DataFrame with nested JSON.
:param columns_to_flatten: List of column names to flatten.
:param json_column_name: The column name containing JSON data to be cleaned and parsed.
:return: A flattened DataFrame.
"""
#----------------------------------------- Part1: Cleaning and Flattening --------------------------------------------------------------
# Step 1: Clean the JSON string in the specified column
json_column_name="data"
clean_col = col(json_column_name)
for pattern, replacement in [
('`', ''),
("'", ''),
# ('-', '_'),
(' ', '_'),
('\.', '_'),
('á', 'a'),
('\$', ''),
('\n', '_')]:
clean_col = regexp_replace(clean_col, pattern, replacement)
filtered_df = df_nested.filter(col("type").like("%YardOrder%"))
filtered_df = filtered_df.select("data")
df_cleaned = filtered_df.withColumn(json_column_name, clean_col)
# Step 2: Parse the JSON string into a DataFrame
schema = spark.read.json(df_cleaned.rdd.map(lambda row: row[json_column_name])).schema
df_with_json = df_cleaned.withColumn(json_column_name, from_json(json_column_name, schema=schema))
# Step 3: Flatten the nested DataFrame
stack = [((), df_with_json)]
column_names = []
while len(stack) > 0:
parents, df = stack.pop()
flat_column_names = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if (c[1][:6] != "struct") and c[0] in columns_to_flatten
]
nested_column_names = [c[0] for c in df.dtypes if c[1][:6] == "struct" and c[0] in columns_to_flatten]
column_names.extend(flat_column_names)
for nested_column_name in nested_column_names:
projected_df = df.select(nested_column_name + ".*")
stack.append((parents + (nested_column_name,), projected_df))
flattened_df = df_with_json.select(column_names)
# Rename the column 'data_id' to 'bk_data_id'
flattened_df = flattened_df.withColumnRenamed("data_id", "bk_data_id")
#------------------------------------------------------------ Part 2: Upsert to silver-------------------------------------------------
deltaTable = DeltaTable.forPath(spark,f"abfss://[email protected]/shippingUnit")
list_of_columns = flattened_df.columns
list_of_BK_columns = ['bk_data_id']
string = ''
for column in list_of_BK_columns:
string += f'table.{column} = newData.{column}'
string_insert = ''
for column in list_of_BK_columns:
string_insert += f'table.{column} = newData.{column} and '
string_insert[:-4]
dictionary = {}
for key in list_of_columns:
dictionary[key] = f'newData.{key}'
# Executing the merge function itself
print(f"batch {epochId} starting merge now at {datetime.now()}")
deltaTable = DeltaTable.forPath(spark, f"abfss://[email protected]/shippingUnit")
deltaTable.alias('table')
.merge(flattened_df.alias("newData"), string)
.whenMatchedUpdate(set=dictionary)
.whenNotMatchedInsert(values=dictionary)
.execute()
print(f"batch {epochId} done at {datetime.now()}")
this is how im writing it
# print(f"Merge initiated at {datetime.now()}")
df.writeStream.foreach(lambda df, epochId: flatten_nested_df_2(df, columns_to_flatten, epochId)).option("checkpointLocation", checkpoint_directory).start()
print(f"Merge done at {datetime.now()}")
But I get the error message [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Im using the single user cluster, runtime 15.3 Beta
any help is much appreciated