TL;DR
A CTAS
that works when run in the Redshift query editor fails when passed to Redshift via a PostgresHook
in Airflow because of weirdness with a case-sensitive field in an external (RDS) table.
The issue
I’m building a small pipeline on Redshift using Airflow 2.7.2 on AWS MWAA.
I’m using PythonOperator
to manage a transaction for a simple transform on data from an external schema connected to RDS. The transform part of the transaction looks like this:
create table "ops_qms_dev"."rem_publication_hx" as
with rem_publication_sitegroups_1yr as (
select
id,
"itemType"::varchar(256) as item_type,
"itemId"::varchar(36) as site_group_id,
event::varchar(256),
owner, object, "timestamp"
from "ext_next_gen"."version"
where
true
and "itemType" = 'SiteGroup'
and event = 'UPDATE' and
json_extract_path_text(owner, 'handler') in (
'publishRem',
'promoteSiteGroupWorkingRems'
)
and "timestamp"::date > current_date - '1 year'::interval
)
select * from rem_publication_sitegroups_1yr;
It fails when run from the Airflow DAG with the following error:
psycopg2.errors.InternalError_: column "itemtype" does not exist
DETAIL:
-----------------------------------------------
error: column "itemtype" does not exist
code: 25300
context:
query: 219700193
location: pgclient.cpp:908
process: query1_101_219700193 [pid=7610]
-----------------------------------------------
Which looks like the error I’d expect if the mixed-case column name weren’t properly double-quoted.
What I’ve tried
- I’ve run the statement verbatim from an external SQL client, and it runs successfully
- I’ve removed the mixed-case columns from the query and run the DAG in Airflow successfully
- I’ve looked at
stl_query
for the offending query from Airflow, and confirmed that thequerytxt
is exactly as I passed it (with properly double-quoted, mixed-case columns where expected) - I’ve tried creating a SQLAlchemy connection from the PostgresHook to see if a different engine would issue with
psycopg2
, but got the same error.
Some additional code for reference
Here’s my python callable:
def build_qms_pipeline_table(table, ctas):
"""Build a table in the QMS schema using a CTAS query, all wrapped in a transaction."""
redshift_hook = PostgresHook(postgres_conn_id="prod-monitoring-analytics")
conn = redshift_hook.get_conn()
_build_qms_pipeline_table(table, ctas, conn)
def _build_qms_pipeline_table(table, ctas, conn):
fully_qualified_table = f'"{OPS_QMS_SCHEMA}"."{table}"'
with conn:
with conn.cursor() as cursor:
try:
logger.info(f"Building {fully_qualified_table}...")
cursor.execute("begin;")
cursor.execute(f"drop table if exists {fully_qualified_table} cascade;")
create_table_query = f"create table {fully_qualified_table} as {ctas};"
cursor.execute(create_table_query)
cursor.execute("commit;")
logger.info(f"Success on {fully_qualified_table}")
except Exception as e:
logger.error(f"Build failed; rolling back for {fully_qualified_table}...")
cursor.execute("rollback;")
raise e
My attempt at a SQLAlchemy connection:
def build_qms_pipeline_table(table, ctas):
"""Build a table in the QMS schema using a CTAS query, all wrapped in a transaction."""
redshift_hook = PostgresHook(postgres_conn_id="redshift-conn")
with create_engine(redshift_hook.get_uri()).connect() as conn:
_build_qms_pipeline_table(table, ctas, conn)
def _build_qms_pipeline_table(table, ctas, conn):
fully_qualified_table = f'"{OPS_QMS_SCHEMA}"."{table}"'
trans = conn.begin()
try:
logger.info(f"Building {fully_qualified_table}...")
conn.execute(text(f"drop table if exists {fully_qualified_table} cascade;"))
create_table_query = text(f"create table {fully_qualified_table} as {ctas};")
conn.execute(create_table_query)
trans.commit()
logger.info(f"Success on {fully_qualified_table}")
except Exception as e:
logger.error(f"Build failed; rolling back for {fully_qualified_table}...")
trans.rollback()
raise e
The DAG task
build_rem_publication_hx = PythonOperator(
task_id="build_rem_publication_hx",
python_callable=build_qms_pipeline_table,
op_kwargs={
"table": "rem_publication_hx",
"ctas": REM_PUBLICATION_HX_CTAS.read_text(),
},
provide_context=True,
dag=dag,
)