I have a query which is string format, I want to parse a dag_run variable into it from the BigQueryInsertJobOperator. I have looked up for some references to do this like in this question. The only difference is I want to parse the dag_run variable. The get_date function should get dag_run transform_date variable when dag runs and decides what’s the final date then parse it into the update query but I keep getting error key error dag_run and broken DAG.
import ...
BSQL = """
-- The purpose of this query is to update the amount of fees into zero
-- which orders are refunded or accepted to refund for shopee case
UPDATE `fact_table` SET amount = 0
WHERE
transaction_date >= {transform_date}
)
"""
default_args = {
"owner": "rama",
"email_on_failure": False,
"email_on_retry": False,
"email": "[email protected]",
"on_failure_callback": alert_slack(conn_id="slack_alert"),
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"fact_table",
start_date=datetime(2024, 5, 21),
schedule_interval="0 23 * * *",
default_args=default_args,
catchup=False,
tags=["fact_table"],
) as dag:
main = EmptyOperator(task_id="START")
end = EmptyOperator(task_id="END")
def get_date(**kwargs):
transform_date = kwargs["dag_run"].conf.get("transform_date")
if transform_date is None:
current_date = datetime.now().date()
truncated_date = current_date.replace(day=1)
result_date = truncated_date - relativedelta(months=1)
transform_date = result_date.strftime("%Y-%m-%d")
return transform_date
fact_table = BigQueryInsertJobOperator(
task_id="fact_table",
configuration={
"query": {
"query": BSQL.format(transform_date=get_date()),
"useLegacySql": False,
}
},
gcp_conn_id=BIGQUERY_STG_CONN,
)
main >> [fact_table] >> end