I have a dag that queries a table in bigquery which returns a list of stored procedures to execute which can vary in both the number of stored procedures as well as the actual stored procedure needed to call for a particular pipeline run.
I am able to create the dynamic tasks and am attempting to control the sequential processing using the max_active_tis_per_dag=1. The entire dag should fail at the point a mapped task fails.
Simple example:
Dependencies:
Task1 (output = list)-> Mapped Task2(0) – > Mapped Task(1) -> Mapped Task(2)
if mapped task 0 fails then mapped task 1 and mapped task 2 do not run (auto fail)
if mapped task 0 successful and mapped task 1 fails then mapped task 2 fails
@dag(schedule=None, start_date=pendulum.datetime(2024, 5, 21, 12, 00, 00), catchup=False)
def get_execution_steps():
1) Gets steps (list of stored procedures) to execute from big query table
2) Sorts steps into the desired processing order. Desire is to process step 1 through x (length of list from step 1).
3) Expand tasks so each stored procedure can be seen running as an individual task with the ability to retrigger a dag from point of failure on without rerunning prior stored procedures.
@task()
def get_and_sort_tasks():
client=bigquery.Client()
unsorted_task_list = client.query(bq_steps_sql, location='US')
logger.info(repr(unsorted_task_list.result()))
sorted_steps_list = sorted(unsorted_task_list, key=lambda x: (int(x[0]), int(x[1])))
logger.info(sorted_steps_list)
sorted_task_list=[]
for step in sorted_steps_list:
execution_task_name = 'Execute_statement_task{}_subtask{}'.format(step[0], step[1])
sql=step[2]
logger.info([execution_task_name, sql])
sorted_task_list.append([execution_task_name, sql])
@task(max_active_tis_per_dag=1, task_id= 'execute_statement_task', map_index_template="{{task_name}}")
def execute_statements(sorted_task_list):
logger.info(sorted_task_list)
context = get_current_context()
context["task_name"] = sorted_task_list[0]
logger.info('Running:{}'.format(sorted_task_list[1]))
client=bigquery.Client()
query=sorted_task_list[1]
job = client.query(query, location='US')
logger.info(repr(job.result()))
tasks = execute_statements.partial().expand(sorted_task_list=get_and_sort_tasks())
Items I’ve tried:
Trigger_rules appear to be at the higher task level. I have tried setting a downstream task with a trigger_rule=’one_failed’ which does in fact trigger if for example mapped task 0 fails, but each individual mapped task 1 and mapped task 2 continue to process.
@task(task_id='task_exception', trigger_rule=('one_failed')
def task_exception():
raise AirflowFailException('Dynamic Statement Failed')
Links I’ve looked at here and here.
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html