I am trying to create a bunch of python scripts to automate the creation of DAGs for Airflow 2.9.x. The idea is to list all tasks in Excel as these DAGs will have to orchestrate tasks for many different groups of Data Engineers and filling an Excel seems to be the best idea until we have a more dynamic was like a JS website to enter and arrange them in order.
Here´s the step-by-step of what I have in mind, all based on GCP:
- Engineers/Developers create their transformation scripts in Dataform (or migrate them from the basic BigQuery scripts that we use today)
- They also setup the corresponding Cloud Functions or Dataproc Workflows (PySpark)
- We get together once or twice per week and they give me the details for their new ETLs and we enter those in the Excel file, at the right row (for example after some dependency).
- In the same row, I put what to launch after the task and the days those should run. For example, a small delta using a Cloud Function from monday to Saturday and a large table on sundays using Dataproc.
- Each task will have a run ID and either a group ID or a scheduled time and day. If it belongs in a group, it should be in a DAG with groups and sub-groups that will trigger as a chain but if they are stand-alone tasks, they should run independently with using their own DAG and a CRON scheduler.
- After everything is revised, I close the Excel file and run the main python file that first reads the Excel and updates a JSON file with the tasks.
- After the JSON file is updated, python should create the necessary DAGs. One for all the “chained” tasks in groups that are also dependant on the prior grouped (basically a group after another) with a default scheduled interval in the DAG args. Also the other independant DAGs for stand alone (scheduled) tasks.
The reason for this is that we are finally going to migrate from basic BQ scripts that run using query schedulers and either pub/sub to launch Cloud Functions or Cloud Scheduler to launch Dataproc workflow Templates to more effective tools for ETLs like Dataform (a.k.a. the GCP dbt) and Airflow. In total we should end with some 1500 tasks.
I have all that working, except for the conditionals. What is the best way to handle that? I’ve tried different options like the BranchDayOfWeekOperator, DayOfWeekSensor and PythonOperator (to dynamically generate tasks for Cloud Function of Dataproc based on a datetime.datetime.today().weekday() comparison). Both BranchDayOfWeekOperator and the PythonOperator work, but I would like to know which one is better.
BTW, DayOfWeekSensor seems to keep the task running when it senses the WeekDay is not right and holds up everything until the next day. Is that normal behavior? Shouldn’t it just skip the task?
The answer I get from this will probably be used also to dynamically create stand-alone tasks that run for example bi-weekly or stuff like that.