We have an Apache Iceberg data lake. We are using structured streaming and getting batches of approximately 10 records. When we merge this data frame into a table of approximately 600 records, we are seeing approximately 2 minutes of delay wall clock time. It is an EMR cluster and not under load.
The MERGE INTO looks like:
deduped_df.persist()
deduped_df.createOrReplaceTempView("stream_df")
deduped_df.sparkSession.sql(
f"""MERGE INTO my_table AS target
USING stream_df AS source
ON target.my_id = source.my_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *"""
)
deduped_df.unpersist()
The execution plan seems to be very complex with a lot of joins and shuffles. However, the amount of data is so small that the amount of processing time surprises us even if there are some full table scans on the join.
The table is partitioned by timestamp, not on the primary key where we make the join. The primary key is quite a long but unique string on each side of the join.
We would appreciate any help as we don’t see a way to optimise the MERGE into for performance.
I’m attaching screenshots of the execution plan below.
1