How to Avoid Duplicated Records in PyFlink When Joining Kafka Stream with Static Data?

I’m working with PyFlink and have encountered an issue where my final output contains duplicated records despite my SQL queries being tested and working correctly in SQL. My setup includes a Kafka consumer that receives CalledNumber, and I am trying to join this stream with a static dataset read from a CSV file.
Here is my setup:

  • Static Data: Read from a CSV file, and registered as a table named static_data fields(zone,code,rate)
  • Streaming Data: Consumed from Kafka, registered as a table named streaming_data.(mostly interested in callednumber and the uniqueRecordId )
  • SQL Query: Performed a left join between streaming_data and static_data based on a condition.

I tried first to execute as one big query


 with cte as(
        select 
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,UniqueRecordID
        ,Zone
        ,CAST( Code as DOUBLE) as Code
        ,Rate
        ,sd.FileName as FileName
        from streaming_data sd
        left join static_data st
        ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
        )
        ,
        LongestMatch as(
        select 
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,max(Code) as Code
        ,UniqueRecordID
        ,FileName
        from cte
        group by
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,UniqueRecordID
        ,FileName
        )
        select 
        CalledNumber
        ,Zone
        ,CAST(st.Code AS VARCHAR(30)) AS Code
        ,CAST(Rate AS VARCHAR(30)) AS Rate
        ,SetupTime
        ,UniqueRecordID
        ,FileName
        from LongestMatch lm
        left join static_data st on lm.Code= CAST (st.Code as DOUBLE)

i also tried splitting queries into one query and multiple tables, using table api

static_data = ts_env.from_path("static_data")
    static_data_renamed = static_data.select(
        expr.col("Zone").alias("static_zone"),
        expr.col("Code"),
        expr.col("Rate")
     
    )


    cte_query= """
         select 
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,UniqueRecordID
        ,CAST( Code as DOUBLE) as Code
        ,sd.FileName as FileName        
        from streaming_data sd
        left join static_data st
        ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
    """
    cte = ts_env.sql_query(cte_query)
    ts_env.create_temporary_view("cte", cte)
  
    # Group by 'Zone' and calculate the maximum 'Code' for each group
    max_code_by_zone = (
        cte.group_by(
            expr.col("CalledNumber")
            ,expr.col("SetupTime")
            ,expr.col("UniqueRecordID")
            ,expr.col("FileName")
            )
        .select(
            expr.col("Code").max.alias("max_code")
            ,expr.col("CalledNumber")
            ,expr.col("SetupTime")
            ,expr.col("UniqueRecordID")
            ,expr.col("FileName")
            )
    )
    
   
    #Join the tables on the specified condition
    joined_table = (
        static_data_renamed
        .join(max_code_by_zone)
        .where( expr.col("Code").cast(DataTypes.DOUBLE()) == expr.col("max_code"))
    )

    # # # Select specific columns from the joined table
    final_result = (
        joined_table
        .select(
            expr.col("CalledNumber"),
            expr.col("static_zone").alias("Zone"),
            expr.col("Code").cast(DataTypes.STRING()).alias("Code"),
            expr.col("Rate").cast(DataTypes.STRING()).alias("Rate"),
            expr.col("SetupTime"),
            expr.col("UniqueRecordID"),
            expr.col("FileName")
        )
    )

    final_result.execute().print()

Here sample result

| +I |                    xxxxxxxxxxx |               236 |     0.31771425 |     2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| -U |                    xxxxxxxxxxx |               236 |     0.31771425 |     2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| +I |                     zzzzzzzzzz |               266 |      0.4332153 |     2024-08-09T14:17:50.927606 | b08f3a3f-4f68-4915-a7c8-7c2
| +U |                   yyyyyyyyyyyy |             23350 |     0.16603965 |     2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| +I |                   yyyyyyyyyyyy |               233 |     0.21922425 |     2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| -U |                   yyyyyyyyyyyy |               233 |     0.21922425 |     2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324

I also tried applying ROW_NUMBER() in order to exclude and take final result.

Is there similar approach when using KAFKA for emit final with windowing.

There also second approach but i failed at first step, when trying to define steupTime as Types.SQL_TIMESTAMP() and DataTypes.TIMESTAMP_LTZ() for schema, there is always issue either at first de-serialization
FIRST ERROR

 java.time.format.DateTimeParseException: Text '2024-08-09 14:27:20.968' could not be parsed at index 10

SECOND ERROR

Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.Instant (java.sql.Timestamp is in module java.sql of loader 'platform'; 

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật