I am encuntering an unexpected behaviour from airflow when trying to pass path of file to tasks.
My dag:
@dag(dag_id="classifire_dag_01", schedule_interval=None, start_date="", catchup=False) def classifire_dag_01():
@task.virtualenv()
def start(file_name, sampling_rate, model_name):
from start import start
# file_name = "/opt/airflow/dags/files/ecg.txt"
return start.main(file_name, sampling_rate, model_name)
@task.virtualenv()
def classifire(input_dict):
from classifire import classifire
return classifire.main(input_dict)
start_output = start(file_name= '/opt/airflow/dags/files/ecg.txt', sampling_rate= 360,model_name= '/opt/airflow/dags/files/NV_classifier')
classifire_output = classifire(input_dict= start_output['input_dict'])
dag = classifire_dag_01()
start_output = start(file_name= '/opt/airflow/dags/files/ecg.txt', sampling_rate= 360,model_name= '/opt/airflow/dags/files/NV_classifier')
It seems it recognize the file_name as Jinja template and retrieves the error. However, it interperates the model_name, which is refering to the same directory, completely fine.
Even for the file_name if I use start(file_name= ”, …) and write the file_name inside the task, the file is found without any problem.
The error:
ERROR - Exception rendering Jinja template for task 'start', field 'op_kwargs'. Template: {'sampling_rate': 360, 'file_name': '/opt/***/dags/files/ecg.txt', 'model_name': '/opt/***/dags/files/NV_classifier'}```
Rest of the error:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/abstractoperator.py", line 717, in _do_render_template_fields
rendered_content = self.render_template(
^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/template/templater.py", line 181, in render_template
return {k: self.render_template(v, context, jinja_env, oids) for k, v in value.items()}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/template/templater.py", line 166, in render_template
template = jinja_env.get_template(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/jinja2/environment.py", line 1010, in get_template
return self._load_template(name, globals)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/jinja2/environment.py", line 969, in _load_template
template = self.loader.load(self, name, self.make_globals(globals))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/jinja2/loaders.py", line 125, in load
source, filename, uptodate = self.get_source(environment, name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/jinja2/loaders.py", line 204, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /opt/airflow/dags/files/ecg.txt
[2024-06-06, 10:26:58 UTC] {taskinstance.py:2890} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 2476, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 2629, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context, jinja_env=jinja_env)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3079, in render_templates
original_task.render_template_fields(context, jinja_env)
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1336, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/abstractoperator.py", line 717, in _do_render_template_fields
rendered_content = self.render_template(
^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/template/templater.py", line 181, in render_template
return {k: self.render_template(v, context, jinja_env, oids) for k, v in value.items()}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/template/templater.py", line 166, in render_template
template = jinja_env.get_template(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/jinja2/environment.py", line 1010, in get_template
return self._load_template(name, globals)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/jinja2/environment.py", line 969, in _load_template
template = self.loader.load(self, name, self.make_globals(globals))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/jinja2/loaders.py", line 125, in load
source, filename, uptodate = self.get_source(environment, name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/jinja2/loaders.py", line 204, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /opt/airflow/dags/files/ecg.txt
[2024-06-06, 10:26:58 UTC] {taskinstance.py:1205} INFO - Marking task as FAILED. dag_id=classifire_dag_01, task_id=start, execution_date=20240606T102656, start_date=20240606T102658, end_date=20240606T102658
[2024-06-06, 10:26:58 UTC] {standard_task_runner.py:110} ERROR - Failed to execute job 410 for task start (/opt/airflow/dags/files/ecg.txt; 1987)
[2024-06-06, 10:26:58 UTC] {local_task_job_runner.py:240} INFO - Task exited with return code 1
[2024-06-06, 10:26:58 UTC] {taskinstance.py:3482} INFO - 0 downstream tasks scheduled from follow-on schedule check
New contributor
Dani Khalang is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.