I have the below flink sql code
WITH entitlement_source_hash AS (
SELECT
pod, org, tenantId, pk, id, segments, hard_deleted, ts,
HASH_CODE(tenantId || id || CAST(segments AS STRING)) AS data_hash
FROM {{ ref('es_entitlement_source') }}
),
entitlement_source_deduped AS (
SELECT *
FROM (
SELECT *,
LAG(data_hash) OVER (PARTITION BY tenantId, id ORDER BY ts) AS prev_data_hash
FROM entitlement_source_hash
)
WHERE data_hash IS DISTINCT FROM prev_data_hash OR prev_data_hash IS NULL
)
SELECT * from entitlement_source_deduped
Here es_entitlement_source
has pod, org, tenantId, pk, id, segments, hard_deleted, ts
columns.
Gist of the query is
- CTE
entitlement_source_hash
computes the has of 3 columns - CTE
entitlement_source_deduped
is computing the previous hash using lag function and selection record if previous hash is null or is different from current. - Finally I select from
entitlement_source_deduped
The above sql produce right results with result when parallelism is 1. But I dont see any record emitted to kafka when i increase the parallelism > 1.
Note: I send the result of this query to kafka using upsert-connector
How could I fix this ?