I’m encountering a ClassCastException error when running an incremental load using DBT and Spark SQL. The error message indicates an issue with casting in the Spark execution plan:
org.apache.hive.service.cli.HiveSQLException: Error running query: java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to org.apache.spark.sql.execution.SparkPlan
Here is the relevant part of my DBT model configuration and SQL query:
Model Configuration
{% set file_path = env_var('LOC_SILVER') %}
{{ config(
location_root= file_path,
unique_key='document_id'
) }}
SELECT DISTINCT *
FROM {{ ref("stg_due_parcels_silver") }} z
{{
common_dbt_functions.incremental_check(
update_date_column='insurer_update_date',
unique_key_column='document_id',
insertion_date_column='inclusion_date',
status_column='status',
apply_full_incremental_logic=true
)
}}
Staging Model
{% set file_path = env_var('LOC_SILVER') %}
{{ config(
location_root= file_path,
unique_key='document_id',
materialized='ephemeral'
) }}
SELECT DISTINCT
document_id,
company_name AS insurer,
client_name AS name,
document_number AS document,
alt_document_number AS alt_number,
parcel_number AS parcel,
payment_type AS payment,
number_of_parcels AS total_parcels,
due_date AS due_date,
payment_date AS payment_date,
insurer_update_date AS insurer_update_date,
current_timestamp AS inclusion_date,
CASE
WHEN payment_date <> '0001-01-01' THEN 'PAID'
WHEN CAST(due_date AS DATE) = CAST(current_timestamp AS DATE) THEN 'DUE TODAY'
WHEN CAST(due_date AS DATE) < CAST(current_timestamp AS DATE)
AND NOT EXISTS (SELECT 1 FROM {{ ref("paid_parcels_silver") }} l WHERE l.document_id = li.document_id)
AND NOT EXISTS (SELECT 1 FROM {{ ref("overdue_parcels_silver") }} l WHERE l.document_id = li.document_id) THEN 'NO INFO'
ELSE 'DUE'
END AS status,
'API' AS source
FROM {{ ref("due_parcels_bronze") }} li
Macro
{% macro incremental_check(update_date_column, unique_key_column, insertion_date_column, status_column, apply_full_incremental_logic=true) %}
{% if is_incremental() %}
WHERE (
{{ update_date_column }} > (SELECT max({{ update_date_column }}) FROM {{ this }})
{% if apply_full_incremental_logic %}
OR (
EXISTS (
SELECT 1
FROM {{ this }} t
WHERE z.{{ unique_key_column }} = t.{{ unique_key_column }}
AND t.{{ update_date_column }} = (
SELECT MAX(tt.{{ update_date_column }})
FROM {{ this }} tt
WHERE tt.{{ unique_key_column }} = t.{{ unique_key_column }}
)
AND t.{{ insertion_date_column }} = (
SELECT MAX(tt.{{ insertion_date_column }})
FROM {{ this }} tt
WHERE tt.{{ unique_key_column }} = t.{{ unique_key_column }}
AND tt.{{ update_date_column }} = t.{{ update_date_column }}
)
AND z.{{ status_column }} <> t.{{ status_column }}
)
)
{% endif %}
)
{% endif %}
{% endmacro %}
The problem seems to arise from the condition AND z.status <> t.status within the incremental_check macro. This condition is intended to identify records where the status has changed, but it appears to be causing a casting issue in Spark SQL. How can I resolve this error to ensure the incremental logic runs correctly in Spark SQL?
The error only occurs when is_incremental() returns true, which suggests that the issue lies within the incremental logic of the macro.
Additional Information:
- I’m using Amazon EMR with Hive and Spark SQL.
- The transformation involves checking for updates and changes in the status column.
- The full error message indicates a casting issue within the Spark execution plan.
Any insights or suggestions would be greatly appreciated!
Removing the z.status Column:
I tried removing the z.status column from the comparison and ran the query using a simple string comparison with t.status. This resulted in the same error.
Casting Both Columns:
I attempted to explicitly cast both z.status and t.status to the same data type (e.g., VARCHAR). Despite this, the error persisted.
Testing with a Static Value:
I replaced the dynamic t.status comparison with a static string value (e.g., ‘PAID’) to see if it was an issue with the data in the column. This also did not fix the issue.
Raul Zinezi is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.