Handling Incremental Data Loading and SCD Type 2 for joined tables in Delta Live Tables on Databricks

I’m working on a project utilizing Delta Live Tables on Databricks, where I need to create a dimension (Kimball style) with slowly changing dimension type 2. The dimension is the result of a join between several bronze tables incrementally loaded using autoloader. The resulting silver table needs to be streaming or append-only since I need to use it as a source for a streaming table loaded using apply changes with scd type 2. However, I’m facing challenges due to the streaming behavior.

Here’s the breakdown of the scenario:

Bronze Layer (bronze_raw): Data is loaded incrementally using autoloader once a day.

Silver Layer: Business logic is applied here to create the dimension, and I need the resulting tables to be either streaming or append-only.

SCD Type 2 Handling (silver_full_hist): The silver tables serve as a source for a streaming table where we apply changes to implement SCD Type 2.

The issue arises during the join operation in the silver layer. Since streaming handles only new rows, joining with existing records can lead to missing data. For instance, if a new customer is added in a CRM system and we need to join the account table with another to retrieve the customer representative, which hasn’t changed, an inner join would result in no rows due to the absence of existing records. A left join would on the other hand end up with ha null value for the rep column even though there exist one.

I’m seeking guidance on how to achieve a seamless flow in this scenario. Any insights or best practices for implementing this workflow in Delta Live Tables would be highly appreciated.

I noticed another question on the topic: Joining Tables for Databricks Delta Live Tables SCD Type 2

It got an answer saying that scd type 2 needs to be applied to each of the bronze tables before joining them. However, then you would need to decide from which of the bronze tables you need to pick the __START_AT and __END_AT columns for the resulting dimension with scd type 2. It might not be obvious in all cases and may force you to infer very complicated logic.

Below is what I have experimented with so far trying to create a streaming silver table to be used as source for apply changes:

silver_data_load_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

#Ingest files using autoloader. Customer representative are found in a table called 'user'
@dlt.table (
    name = "bronze_account_raw"
  )
def collect_raw_bronze():
  return (
      spark.readStream
          .format("cloudFiles")
          .options(**csv_file_options)
          .load(directory_account)

@dlt.table (
    name = "bronze_user_raw"
  )
def collect_raw_bronze():
  return (
      spark.readStream
          .format("cloudFiles")
          .options(**csv_file_options)
          .load(directory_user)       

#Get the latest state of the sources
dlt.create_streaming_table(name="bronze_account_latest")

dlt.apply_changes(
  target = "bronze_account_latest",
  source = "bronze_account_raw",
  keys = [account_id],
  sequence_by ="commit_timestamp",
  apply_as_deletes =F.expr(f"operation = 'delete'"),
  except_column_list=['operation'],
  stored_as_scd_type = 1
)

dlt.create_streaming_table(name="bronze_user_latest")

dlt.apply_changes(
  target = "bronze_user_latest",
  source = "bronze_user_raw",
  keys = [user_id],
  sequence_by ="commit_timestamp",
  apply_as_deletes =F.expr(f"operation = 'delete'"),
  except_column_list=['operation'],
  stored_as_scd_type = 1
)

#Get dimension by joining bronze tables. Created as temporary table here since I initially it forgets about the state and reads all rows in the bronze tables and does not raise the exception about deletes and updates (not the case, it raised the error).

transform_query = f"""
SELECT a.customer_name
       ,u.rep_name
FROM STREAM(LIVE.bronze_account_latest) as a
INNER JOIN STREAM(LIVE.bronze_user_latest) as u on a.rep_id= u.user_id
"""

@dlt.table(
        name="silver_customer",
        temporary=True
    )
def load_silver():
    df = spark.sql(transform_query)
    columns = df.columns
    return (
        df.withColumn("silver_commit_timestamp",F.lit(silver_data_load_timestamp))     
        .select(*columns, "silver_commit_timestamp")
    )

#Here I expect a scd type 2 table based on the silver table. I can then choose to (1) get the latest state of the dimension by setting END_AT IS NULL or (2) get the state for a particular point in time.

dlt.create_streaming_table(name="silver_customer_scd",table_properties={"quality": "silver"},)

dlt.apply_changes(
  target = "silver_customer_scd",
  source = "silver_customer",
  keys = ["customer_name"],
  sequence_by ="silver_commit_timestamp",
  stored_as_scd_type = 2
)

New contributor

Jesper Martinsson is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật