I was told to disable the dbt DAG in Airflow, which I did, but the dbt DAG in my case is called by a parent “main” DAG, which calls an “extract” DAG before calling the dbt one.
I disabled the dbt dag in the Airflow UI, but the main DAG hangs with this message:
[2024-06-14, 04:41:42 UTC] {trigger_dagrun.py:178} INFO – Waiting for extract on 2024-06-13T18:03:09.514547+00:00 to become allowed state [‘success’] …
[2024-06-14, 04:42:03 UTC] {local_task_job.py:211} WARNING – State of this instance has been externally set to failed. Terminating instance.
I had to manually mark the main DAG as failed.
I’m looking to tell the main DAG to skip calling a DAG in the chain if is disabled in the UI.
This is the main DAG code:
# Default settings applied to all tasks
default_args={
"owner":"airflow",
"retries": 0,
"retry_delay": timedelta(minutes=5),
"start_date": datetime(2021, 12, 1),
"catchup": False,
'on_failure_callback': on_failure_callback,
}
@dag(
start_date=datetime(2019, 1, 1),
max_active_runs=1,
schedule_interval=schedule(daily_schedule), # Run at 5:00am MST
default_args=default_args,
catchup=False,
concurrency=4,
tags=['extract', 'main']
)
def main():
with TaskGroup(group_id='extract') as extract:
TriggerDagRunOperator(
task_id='extract',
trigger_dag_id='extract',
wait_for_completion=True,
trigger_rule="all_done" # Run even if previous tasks failed
)
dbt = TriggerDagRunOperator(
task_id='dbt',
trigger_dag_id='dbt_api_dag',
wait_for_completion=True,
trigger_rule="all_done" # Run even if previous tasks failed
)
extract >> dbt
taskflow = main()
By the way, this is what asking Gemini returned, but I haven’t tested it yet. I’ll update the answer if this works.
from airflow import XCom
# Add a task before dbt_api_dag to check XCom
check_dbt_enabled = ShortCircuitOperator(
task_id='check_dbt_enabled',
provide_context=True,
)
def check_dbt_enabled_fn(**context):
# Read XCom value with key indicating dbt_api_dag state (e.g., "dbt_api_enabled")
dbt_enabled = XCom.pull(dag_id=dag.dag_id, task_ids='set_dbt_state', key='dbt_api_enabled')
if dbt_enabled is None or dbt_enabled.lower() != "true":
context['ti'].xcom_push(key='skip_dbt', value=True)
return False # Skip dbt_api_dag execution
return True
check_dbt_enabled.python_callable = check_dbt_enabled_fn
# Modify dbt task to check XCom for skipping
dbt = TriggerDagRunOperator(
task_id='dbt',
trigger_dag_id='dbt_api_dag',
wait_for_completion=True,
trigger_rule="all_done",
provide_context=True,
)
def dbt_with_skip(**context):
skip_dbt = context['ti'].xcom_pull(key='skip_dbt', task_ids='check_dbt_enabled')
if skip_dbt:
return # Skip dbt execution
# Trigger dbt_api_dag normally if XCom is not set or set to "True"
return True
dbt.python_callable = dbt_with_skip
# Update task dependencies
extract >> check_dbt_enabled >> dbt