I installed Airflow by docker according to this video. (Windows+WSL+Ubuntu+Docker)
I have my_dag.py
file (replaced APIKEY):
<code>from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
def _read_api_data():
x1 = requests.get('http://api.openweathermap.org/data/2.5/weather?q=London&appid=APIKEY')
x2 = requests.get('http://api.openweathermap.org/data/2.5/weather?q=Moscow&appid=APIKEY')
responses = [x1, x2]
output_object = {"responses": []}
for x in responses:
json_object = x.json()
output_object["responses"].append(json_object)
json_str = json.dumps(output_object, indent=4)
# Writing to weather_data.json
with open("/tmp/weather_data.json", "w") as outfile:
outfile.write(json_str)
def _download_data():
with open('/tmp/weather_data.json') as f:
d = json.load(f)
responses = d["responses"]
temps_K = [round(r["main"]["temp"] - 273.15, 2) for r in responses]
names = [r["name"] for r in responses]
df = pd.DataFrame({"names": names, "temps_K": temps_K})
return df
def _process_data(ti):
df = ti.xcom_pull(task_ids="download_data")
df["temps_C"] = round(temps["temps_K"] - 273.15, 2)
df.to_csv('/tmp/processed_weather_data.csv')
return df
def _save_data(ti):
df = ti.xcom_pull(task_ids="process_data")
df.to_parquet('/tmp/weather.parquet')
with DAG("weather_data_pipeline_dag",
schedule_interval="@once",
start_date=datetime(2024,7,14)) as dag:
taskPython0 = PythonOperator(
task_id = "read_api_data",
python_callable = _read_api_data
)
taskPython1 = PythonOperator(
task_id = "download_data",
python_callable = _download_data
)
taskPython2 = PythonOperator(
task_id = "process_data",
python_callable = _process_data
)
taskPython3 = PythonOperator(
task_id = "save_data",
python_callable = _save_data
)
taskPython0 >> taskPython1 >> taskPython2 >> taskPython3
</code>
<code>from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
def _read_api_data():
x1 = requests.get('http://api.openweathermap.org/data/2.5/weather?q=London&appid=APIKEY')
x2 = requests.get('http://api.openweathermap.org/data/2.5/weather?q=Moscow&appid=APIKEY')
responses = [x1, x2]
output_object = {"responses": []}
for x in responses:
json_object = x.json()
output_object["responses"].append(json_object)
json_str = json.dumps(output_object, indent=4)
# Writing to weather_data.json
with open("/tmp/weather_data.json", "w") as outfile:
outfile.write(json_str)
def _download_data():
with open('/tmp/weather_data.json') as f:
d = json.load(f)
responses = d["responses"]
temps_K = [round(r["main"]["temp"] - 273.15, 2) for r in responses]
names = [r["name"] for r in responses]
df = pd.DataFrame({"names": names, "temps_K": temps_K})
return df
def _process_data(ti):
df = ti.xcom_pull(task_ids="download_data")
df["temps_C"] = round(temps["temps_K"] - 273.15, 2)
df.to_csv('/tmp/processed_weather_data.csv')
return df
def _save_data(ti):
df = ti.xcom_pull(task_ids="process_data")
df.to_parquet('/tmp/weather.parquet')
with DAG("weather_data_pipeline_dag",
schedule_interval="@once",
start_date=datetime(2024,7,14)) as dag:
taskPython0 = PythonOperator(
task_id = "read_api_data",
python_callable = _read_api_data
)
taskPython1 = PythonOperator(
task_id = "download_data",
python_callable = _download_data
)
taskPython2 = PythonOperator(
task_id = "process_data",
python_callable = _process_data
)
taskPython3 = PythonOperator(
task_id = "save_data",
python_callable = _save_data
)
taskPython0 >> taskPython1 >> taskPython2 >> taskPython3
</code>
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
def _read_api_data():
x1 = requests.get('http://api.openweathermap.org/data/2.5/weather?q=London&appid=APIKEY')
x2 = requests.get('http://api.openweathermap.org/data/2.5/weather?q=Moscow&appid=APIKEY')
responses = [x1, x2]
output_object = {"responses": []}
for x in responses:
json_object = x.json()
output_object["responses"].append(json_object)
json_str = json.dumps(output_object, indent=4)
# Writing to weather_data.json
with open("/tmp/weather_data.json", "w") as outfile:
outfile.write(json_str)
def _download_data():
with open('/tmp/weather_data.json') as f:
d = json.load(f)
responses = d["responses"]
temps_K = [round(r["main"]["temp"] - 273.15, 2) for r in responses]
names = [r["name"] for r in responses]
df = pd.DataFrame({"names": names, "temps_K": temps_K})
return df
def _process_data(ti):
df = ti.xcom_pull(task_ids="download_data")
df["temps_C"] = round(temps["temps_K"] - 273.15, 2)
df.to_csv('/tmp/processed_weather_data.csv')
return df
def _save_data(ti):
df = ti.xcom_pull(task_ids="process_data")
df.to_parquet('/tmp/weather.parquet')
with DAG("weather_data_pipeline_dag",
schedule_interval="@once",
start_date=datetime(2024,7,14)) as dag:
taskPython0 = PythonOperator(
task_id = "read_api_data",
python_callable = _read_api_data
)
taskPython1 = PythonOperator(
task_id = "download_data",
python_callable = _download_data
)
taskPython2 = PythonOperator(
task_id = "process_data",
python_callable = _process_data
)
taskPython3 = PythonOperator(
task_id = "save_data",
python_callable = _save_data
)
taskPython0 >> taskPython1 >> taskPython2 >> taskPython3
But my dag failed. It read_api_data
task failed. . How to solve that? where i can read LOG traceback of my DAG? it seems that airflow does not give details.
1
Did you read the error message you got?
Please install package
apache-airflow
instead ofairflow
I think you should try that – pip install apache-airflow
or to be very explicit about the Python version to use, python3 -m pip install apache-airflow
– and try again.
2