I want to prioritize a run_test_task
in DAG B
depending on an xcom value from a task get_username
in DAG A
. DAG B
is triggered by a TriggerDagRunOperator trigger_task
in DAG A
.
The idea is to use Custom Weight Rule that is based on a built-in _DownstreamPriorityWeightStrategy class. Here is my simplified plugin code:
class CustomPriorityStrategy(_DownstreamPriorityWeightStrategy):
@provide_session
def get_weight(self, ti: TaskInstance, session: Session) -> int:
weight = super().get_weight(ti)
username = ti.xcom_pull(task_ids='get_username', session=session)
if username == "john":
weight += 1000
return weight
Here is the usage:
run_test_task = RunTestOperator(
...
weight_rule="custom_priority_weight_strategy.CustomPriorityStrategy",
...
The trigger_task
fails with
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3901, in _bulk_save_mappings
persistence._bulk_insert(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 74, in _bulk_insert
connection = session_transaction.connection(base_mapper)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 627, in connection
self._assert_active()
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 620, in _assert_active
raise sa_exc.ResourceClosedError(closed_msg)
sqlalchemy.exc.ResourceClosedError: This transaction is closed
Any idea how to get it work?
Ivan Agapitov is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.