I am testing out airflow, I am running into the below error when implementing the dag.
ERROR [airflow.models.dagbag.DagBag] Failed to import: /Users/sbhardwa/airflow/dags/heart_desease/inference_pipeline.py
Traceback (most recent call last):
File "/Users/sbhardwa/airflow/airflow_venv/lib/python3.12/site-packages/airflow/models/dagbag.py", line 351, in parse
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/Users/sbhardwa/airflow/dags/heart_desease/inference_pipeline.py", line 57, in <module>
normalise_task.set_downstream(predict_data)
File "/Users/sbhardwa/airflow/airflow_venv/lib/python3.12/site-packages/airflow/models/taskmixin.py", line 262, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/Users/sbhardwa/airflow/airflow_venv/lib/python3.12/site-packages/airflow/models/taskmixin.py", line 214, in _set_relatives
task_object.update_relative(self, not upstream, edge_modifier=edge_modifier)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'update_relative'
Here is the code block –
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from heart_desease.utils import *
default_args = {
'owner': 'airflow',
'start_date': datetime(2023,7,30),
'retries' : 1,
'retry_delay' : timedelta(seconds=30)
}
ML_inference_dag = DAG(
dag_id = 'Heart_dag',
default_args = default_args,
description = 'Dag to run inferences on predictions of heart disease patients',
schedule_interval = '@hourly'
)
load_task = PythonOperator(
task_id = 'load_task',
python_callable = get_inference_data,
dag = ML_inference_dag)
######
#Define task for encoding the categorial variables here
######
encode_task = PythonOperator(
task_id = 'encode_task',
python_callable = encode_features,
dag = ML_inference_dag)
######
#Define task for normalising the variables here
######
normalise_task = PythonOperator(
task_id = 'normalise_task',
python_callable = normalize_data,
dag = ML_inference_dag)
######
#Define task for getting models prediction here
######
prediction_task = PythonOperator(
task_id = 'prediction_task',
python_callable = predict_data,
dag = ML_inference_dag)
load_task.set_downstream(encode_task)
encode_task.set_downstream(normalise_task)
normalise_task.set_downstream(predict_data)
here are the airflow detail –
Providers info
apache-airflow-providers-common-io | 1.3.1
apache-airflow-providers-common-sql | 1.13.0
apache-airflow-providers-fab | 1.1.0
apache-airflow-providers-ftp | 3.9.0
apache-airflow-providers-http | 4.11.0
apache-airflow-providers-imap | 3.6.0
apache-airflow-providers-smtp | 1.7.0
apache-airflow-providers-sqlite | 3.8.0
When I remove the ‘set_downstream’ lines, the Dags runs without error.
I googled, but I couldn’t and response to this error
Please advise
shashank bhardwaj is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.