I got 2 dags in same dags folder:
- dag_update_database (dag_update_database.py)
- dag_add_client_loyalty (dag_updade_clients_loyalty.py)
I need run second dag right after successfully run first dag.
I’m trying to add TriggerDagRunOperator to first dag, but It’s not working.
How to use TriggerDagRunOperator right?
My first dag:
import datetime
from airflow.decorators import dag, task
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from dag_add_client_loyalty import dag_add_client_loyalty
default_args = {
'owner': 'd-chernovol',
'depends_on_past': False,
'retries': 5,
'retry_delay': datetime.timedelta(minutes=1),
'start_date': datetime.datetime(2024, 6, 30)
}
schedule_interval = '*/20 * * * *'
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=4)
def dag_update_database():
@task
def get_salons(api, tries: int):
...
return result_from_salons_api
@task
def get_clients(api, tries: int, result_from_salons_api: list):
...
return clients
@task
def trigger_add_client_loyalty_dag():
trigger = TriggerDagRunOperator(
task_id='trigger_add_client_loyalty_dag',
trigger_dag_id='dag_add_client_loyalty',
execution_date='{{ execution_date }}'
)
get_salon_task = get_salons(api, 10)
get_clients_task = get_clients(api, 10, get_salon_task)
run_add_client_loyalty_dag_task = trigger_add_client_loyalty_dag()
get_salon_task.set_downstream(get_clients_task)
get_clients_task.set_downstream(run_add_client_loyalty_dag_task)
dag_update_database = dag_update_database()
Second dag code:
from airflow.decorators import dag, task
default_args = {
'owner': 'd-chernovol',
'depends_on_past': False,
'retries': 5,
'retry_delay': datetime.timedelta(minutes=1),
'start_date': datetime.datetime(2024, 6, 30)
}
@dag(default_args=default_args, schedule_interval=None, catchup=False, concurrency=4)
def dag_add_client_loyalty():
@task
def get_salons_list():
return result
@task
def get_records_from_api():
return result
get_salon_task = get_salons_list(api)
get_records_from_api_task = get_records_from_api(api, get_salon_task)
get_salon_task.set_downstream(get_records_from_api_task)
dag_add_client_loyalty = dag_add_client_loyalty()