Is there any way to call an external function from priority_weight parameter in task to get the value from a config file.
My DAG looks somewhat like this:
dag=DAG(
'Priority',
default_args=args,
schedule_interval=None,
tags=["test_priority"],
catchup=False,
)
FILE_TRANSFER_TO_VM=BashOperator(
task_id='{0}'.format(var_file_transfer_to_vm),
bash_command='gsutil -m cp gs://folders/files gs://otherfolder/),
priority_weight=5,
weight_rule='absolute',
dag=dag
)
What I am looking for is:
file: config.cfg
x.a=11
y.b=22
z.c=33
file: get_priority.py
file_path = 'config.cfg'
def fetch_value_from_cfg(context):
key = context['task_instance'][0] ##To get dag_id.task_id from context lets say 'y.b'
print(key_prefix)
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
if line.startswith(key_prefix):
return line.split('=')[1].strip()
return None
Dag file: Priority.py
import get_priority
dag=DAG(
'Priority',
default_args=args,
schedule_interval=None,
tags=["test_priority"],
catchup=False,
)
FILE_TRANSFER_TO_VM=BashOperator(
task_id='{0}'.format(var_file_transfer_to_vm),
bash_command='gsutil -m cp gs://folders/files gs://otherfolder/',
priority_weight=get_priority.fetch_value_from_cfg, ##expecting this to pass context as argument to function
weight_rule='absolute',
dag=dag
)
I am fairly new to Airflow and Python. Can not figure out how to achieve this behavior. Any help/guidance is much appreciated.
Reason I need to do this is because we have around 1500 Dags and approx~ 10 tasks in each dag. Changing, updating Priority of each task is not feasible. Also, using global_priority for all tasks in each DAG not solving the purpose as we need to assign different values for different tasks.
VVK is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.