I have a DAG that has as parameter a customizable list of tasks to execute, so that I can choose for example only to execute tasks ['prod_1','prod_2','prod_5']
. I do this via a BranchPythonOperator()
that allows me to run only the tasks I set as input when running the dag:
And by marking as skipped
all the downstrem tasks (see BranchPythonOperator). This works as intended, as from the log of that node:
[2024-08-08, 13:20:59 UTC] {python.py:173} INFO - Done. Returned value was: ['prod_1', 'prod_2', 'prod_5']
[2024-08-08, 13:20:59 UTC] {skipmixin.py:140} INFO - Following branch ['prod_1', 'prod_2', 'prod_5']
[2024-08-08, 13:20:59 UTC] {skipmixin.py:175} INFO - Skipping tasks ['prod_3', 'prod_4', 'prod_6', 'prod_7', 'prod_8', 'prod_9', 'prod_10']
This is my code:
with DAG(
schedule_interval = schedule_intervals[env],
dag_id = dag_id,
default_args = default_args,
start_date = start_date,
max_active_tasks = parallel+1,
concurrency=parallel,
catchup = False,
dagrun_timeout = None,
tags = dag_tags,
params = {"models_to_run": [
'prod_1',
'prod_2',
'prod_3',
'prod_4',
'prod_5',
'prod_6',
'prod_7',
'prod_8',
'prod_9',
'prod_10',
]}
) as dag:
def _choose_tasks_fnc(TASKS_A_EJECUTAR):
TASKS_A_EJECUTAR_NEW = TASKS_A_EJECUTAR.replace("[","").replace("]","").replace("'","").split(", ")
return TASKS_A_EJECUTAR_NEW
choose_tasks = BranchPythonOperator(
task_id='tasks_to_execute_branch',
python_callable=_choose_tasks_fnc,
op_kwargs={
'TASKS_A_EJECUTAR' : '{{params["models_to_run"]}}',
}
)
mbss = CustomPythonNotebookOperator(
task_id = 'prod_1',
nb_path = nb_path,
id_producto = 'prod_1'
)
mbds = CustomPythonNotebookOperator(
task_id = 'prod_2',
nb_path = nb_path,
id_producto = 'prod_2'
)
/////***** many other tasks *****/////
mbh = CustomPythonNotebookOperator(
task_id = 'prod_10',
nb_path = nb_path,
id_producto = 'prod_10'
)
choose_tasks >> [mbss, mbds, mba, mbh, mbv, mbs, mbd, segm, sega, segv]
The thing is that for memory issues I can’t run more than 3 tasks in parallel. With this setting, if I specify more tasks (let’s say all 10 tasks) my DAG will execute all of them in parallel. How can I modify it so that it will run no more than 3 tasks in parallel among the list of tasks I pass to it?