I am working with Airflow and have created a DAG that uses dynamic task mapping inside a task group.
I have two questions:
-
How to remove the line from get_files to process_file_step2?
- I want process_file_step1 to be the only upstream task of process_file_step2, even though the input (file_path) is received from get_files.
- How can I modify the DAG to remove this dependency? As shown in the attached DAG graph, there is an unwanted direct line from get_files to process_file_step2.
-
How to get the file name inside the task group by parsing the dynamically mapped input?
- Inside the task group, I need to parse input directly and use it for downstream tasks
- I have added a TODO placeholder for this in the example dag code below but am unsure how to correctly implement it.
Example DAG and graph are attached below
from airflow.decorators import dag, task_group, task
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
@dag(
start_date=days_ago(1),
schedule=None,
catchup=False
)
def dynamic_task_group_mapping():
@task_group(group_id="process_file")
def tg1(file_path):
@task
def process_file_step1(file_path):
return f"Step 1 processed {file_path}"
@task
def process_file_step2(file_path):
return f"Step 2 processed {file_path}"
process_file_step1(file_path) >> process_file_step2(file_path)
# # TODO: how to parse file_path to get file name
# from pathlib import Path
# file_name = Path(file_path).name
# return file_name
@task
def get_files():
return ['file1', 'file2', 'file3']
tg1.expand(file_path=get_files()) >> EmptyOperator(task_id='Notify', trigger_rule="none_failed_min_one_success")
dynamic_task_group_mapping()
DAG Graph: I want to avoid line/dependency from get_files
to process_file_step2