I am new to airflow. I want to create the python tasks dynamically and run them in parallel(10 tasks at a time).
My dag is supposed to get the records from table(using get_data(name)) and create the dynamic tasks and run them in parallel. the same parameter passed to taska should be forwarded to taskb.
Here is the diagram for better understanding.
Here is the code that I use to implement the above requirement:
from datetime import timedelta,datetime
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.models.connection import Connection
from test_func import taska,taskb
POOL_NAME = "abc_pool"
default_args = {
"owner":"airflow",
"depends_on_past":False,
"conn_id": "JDBC",
"retries": 1,
"retry_delay": timedelta(seconds=30),
}
def get_data(name):
#Connection code here
query="select * from table col1='" + name + "'"
curs.execute(query)
result=curs.fetchall()
col_names=[desc[0] for desc in curs.description]
results=[dict(zip(col_names,row)) for row in result]
return results
def create_dag(results, default_args):
@dag(
dag_id=f"test_dag_parallel",
default_args=default_args,
description='test dag to check parallel run ',
schedule_interval=timedelta(days=1),
start_date=datetime(2024,1,1),
catchup=False,
)
def process_test():
start_dag = EmptyOperator(task_id ="start_dag")
end_dag = EmptyOperator(task_id ="end_dag")
@task
def func_taska(a, conn: Connection):
taska(a, conn)
@task
def func_taskb(b, conn: Connection):
taskb(b, conn)
def generate_tasks():
for i in results:
param = i['id']
start_dag >> func_taska.override(task_id=f"task_{param}", pool=POOL_NAME)(param) >> func_taskb.override(task_id=f"task{param}", pool=POOL_NAME)(param) >> end_dag
generate_tasks()
generated_dag = process_test()
return generated_dag
create_dag(get_data("pqr"), default_args)
I am not sure what is wrong with this approach. Can someone help me understand the problem with this approach and correct way to solve this?