I have a list of products, lets say “product_1”, “product_2”, etc… for which I want to run a node that using a for
loop basically runs a python jupyter notebook CustomPythonNotebookOperator()
no more than 3 nodes at the same time as specified by the parameter parallel
. This kind of node comes form my organization and cannot be changed.
My currend dag is based on the following code
ids_productos = [
'prod_1',
'prod_2',
'prod_3',
'prod_4',
'prod_5',
'prod_6',
'prod_7',
'prod_8',
'prod_9',
'prod_10',
]
parallel = 3
with DAG(
schedule_interval = schedule_intervals[env],
dag_id = dag_id,
default_args = default_args,
start_date = start_date,
max_active_tasks = parallel+1,
concurrency=parallel,
catchup = False,
dagrun_timeout = None,
tags = dag_tags
) as dag:
with TaskGroup(group_id = matriz_name) as matriz:
a = []
i=0
for id_producto in ids_productos:
a.append(
CustomPythonNotebookOperator(
task_id = str(id_producto),
nb_path = nb_path,
id_producto = id_producto
)
)
if i != 0 and i >= parallel:
a[i-parallel] >> a[i]
i += 1
matriz
What I want is to change my code so that I can choose for which products run the process instead of having to change the hardcoded list ids_productos
each time.
What I did is to modify the DAG()
to add the argument params
, so that when I run it I can simply delete the products I’m not interested in while by default it runs all products:
with DAG(
schedule_interval = schedule_intervals[env],
dag_id = dag_id,
default_args = default_args,
start_date = start_date,
max_active_tasks = parallel+1,
concurrency=parallel,
catchup = False,
dagrun_timeout = None,
tags = dag_tags,
params = {'models_to_run': [
'prod_1',
'prod_2',
'prod_3',
'prod_4',
'prod_5',
'prod_6',
'prod_7',
'prod_8',
'prod_9',
'prod_10',
]}
) as dag:
But then I don’t know how to use the parameter models_to_run
let’s say models_to_run = ['prod_1','prod_3']
to run only the nodes for those two products. Also, if I want to select only 4 products I would need to run only 3 of them in parallel.