I am using Autoloader in Trigger Once mode to load Parquet files from an S3 location. My goal is to implement change data capture by comparing the source and target Delta tables to identify and capture INSERTS, UPDATES, and DELETES. Instead of performing a MERGE, I aim to build an append-only table that logs these changes. Below is my ForEachBatch upsert function. Although it runs, it does not capture any DELETES or UPDATES.
# Inputs
# Batch #1
`batch_1_timestamp = "2024-07-19"`
`data = [("1", "John", batch_1_timestamp),("2", "David", batch_1_timestamp),]`
# Batch #2
`batch_2_timestamp = "2024-07-20"`
`data = [
("1", "John", batch_2_timestamp), # Same record
("2", "David Jones", batch_2_timestamp), # Updated record
("3", "Bob", batch_2_timestamp), # New record
]`
# Batch #3
`batch_3_timestamp = "2024-07-21"
data = [
("2", "David", batch_3_timestamp), # Update
("3", "Bob", batch_3_timestamp), # Same
# 1 deleted
]`
# Expected Output
| id | name | batch_timestamp | operation_type
| -------- | -------- | -------- |
| 1 | John | 2024-07-19 | INSERT |
| 2 | David | 2024-07-19 | INSERT |
| 2 | David Jones | 2024-07-20 | UPDATE |
| 3 | Bob | 2024-07-20 | INSERT |
| 2 | David | 2024-07-21 | UPDATE |
| 1 | John | 2024-07-21 | DELETE |
def upsertToDelta(batch_df: DataFrame, batch_id: int):
target_table = (
spark
.table(target_table_qualified)
)
hashed_df = batch_df
df_source = utl.add_row_hash_to_dataframe(
df=hashed_df,
ignore_columns=metadata_columns
)
created_at_timestamp = datetime.now()
new_records = df_source.alias("s").join(
target_table.alias("t"),
on=[
col("s.id") == col("t.id")
],
how = "left"
).filter(
col("t.id").isNull()
).select(
"s.*",
lit("INSERT").alias("operation_type"),
lit(created_at_timestamp).alias("created_at"),
)
updated_records = df_source.alias("s").join(
target_table.alias("t"),
on=[
col("s.id") == col("t.id"),
col("s.batch_timestamp") > col("t.batch_timestamp")
],
how="inner"
).filter(
col("s.row_hash") != col("t.row_hash")
).select(
"s.*",
lit("UPDATE").alias("operation_type"),
lit(created_at_timestamp).alias("created_at")
)
deleted_records = (
target_table.alias("t")
.join(
df_source.alias("s"),
on=[
col("t.id") == col("s.id"),
],
how="left_anti"
)
.select('t.*')
.withColumn("operation_type", lit("DELETE"))
.withColumn("created_at", lit(created_at_timestamp))
)
df_processed = new_records.union(updated_records).union(deleted_records)
df_processed.write.saveAsTable(
target_table_qualified,
format="delta",
mode="append",
)
# Write file stream
write_stream = (df_converted.writeStream
.format("delta")
.outputMode("append")
.option("cloudFiles.schemaEvolutionMode", "rescue")
.foreachBatch(upsertToDelta)
.queryName(f"BronzeMerge[{target_table_qualified}]")
.trigger(once=True)
.option("checkpointLocation", checkpoint_location)
.start())
write_stream.awaitTermination()
New contributor
user26458184 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.