I have about 15 workflows that need to run on different schedules or with different triggers. But the orchestration that I need means that some workflows aren’t allowed to run when some others are running and need to be differed. I am trying to write a pretty and easy solution.
This is what I currently have as an example DAG:
@dag(
catchup=False,
tags=["staging"],
start_date=datetime(2021, 12, 21),
schedule="*/5 2-17 * * 1-5",
)
def validate_transactions():
locks = [TRANSACTION_VALIDATION]
transaction_validation = TransactionValidation()
@task()
def get_transactions_to_validate():
return transaction_validation.get_to_validate_transactions()
@task()
def validate_transaction(transaction_id: int):
return transaction_validation.validate_transactions(transaction_id)
@teardown()
def unlock():
free_locks(locks)
t1 = get_lock(locks)
t2 = get_transactions_to_validate()
t3 = validate_transaction.expand(transaction_id=t2)
t4 = unlock()
t1 >> t2 >> t3 >> t4
validate_transactions()
Where get_lock and unlock are defined in:
def create_redis_object():
host = os.environ.get("BROKER_URL")
port = int(os.environ.get("BROKER_PORT"))
return redis.Redis(host=host, port=port, db=9)
cache = create_redis_object()
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def get_lock(locks: List[str] | str):
run_id = get_current_context()['dag_run'].run_id
if isinstance(locks, str):
locks = [locks]
allowed = True
for lock_name in locks:
logging.info(f"Trying to get lock: {lock_name}")
if allowed and cache.get(lock_name) is not None:
allowed = False
if allowed:
for lock_name in locks:
cache.setnx(lock_name, run_id)
if PokeReturnValue is not None:
return PokeReturnValue(allowed)
else:
return allowed
def free_locks(locks):
for lock_name in locks:
try:
cache.delete(lock_name)
logging.info(f"Freed lock {lock_name}")
except:
logging.warning(f"Could not free lock {lock_name}")
But I was thinking of writing an own custom @dag decorator that would automatically add the get_lock and free_locks tasks, rather than having to redo it every single dag again and again.
But I am a bit stuck on how I would do such a thing and am looking for suggestions.