I created an Airflow script that pulls info from Open weather api and loads into Postgres database locally.The Dag is running successfully and executes every hour however data for next 5 days has been loaded into my database instead of sequential loading of data every hour and updating the database. Whenever I trigger the Dag, the data is not updated anymore, just whatever data was loaded initially exists and no updates anymore.
Can someone help me in this
<code>import requests
import psycopg2
from psycopg2 import Error
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# Define the DAG parameters
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 6, 28),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'metro_cities',
default_args=default_args,
description='Fetch weather forecast data from OpenWeatherMap API and insert into PostgreSQL',
schedule_interval=timedelta(hours=1), # Run every 1 hour
)
# Function to fetch weather forecast data from OpenWeather API
def fetch_weather_forecast(api_key, city):
url = f'http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={api_key}&units=metric'
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
print(f'Failed to fetch data: {response.status_code}')
return None
# Function to insert forecast data into PostgreSQL
def insert_forecast_into_db(api_key, city, **kwargs):
try:
connection = psycopg2.connect(
user="postgres",
password="password",
host="localhost",
port="5432",
database="postgres"
)
cursor = connection.cursor()
forecast_data = fetch_weather_forecast(api_key, city)
if forecast_data:
forecast_list = forecast_data.get('list', [])
for forecast in forecast_list:
date_time = datetime.fromtimestamp(forecast['dt'])
# Example: Inserting forecast data into a table
cursor.execute("""
INSERT INTO weather_forecast (city, temperature, humidity, description, date_time)
VALUES (%s, %s, %s, %s, %s);
""", (
city,
forecast['main']['temp'],
forecast['main']['humidity'],
forecast['weather'][0]['description'],
date_time
))
connection.commit()
print("Forecast data inserted successfully")
except (Exception, Error) as error:
print("Error while connecting to PostgreSQL", error)
finally:
if connection:
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
# Define cities for forecast
cities = ['Seattle', 'Oakland', 'Atlanta']
api_key ='api'
# Define tasks in the DAG for each city
for city in cities:
fetch_weather_task = PythonOperator(
task_id=f'fetch_weather_forecast_{city.lower().replace(" ", "_")}',
python_callable=fetch_weather_forecast,
op_args=[api_key, city],
dag=dag,
)
insert_forecast_task = PythonOperator(
task_id=f'insert_forecast_into_postgres_{city.lower().replace(" ", "_")}',
python_callable=insert_forecast_into_db,
op_kwargs={'api_key': api_key, 'city': city},
provide_context=True, # This allows passing context (like XCom) to the function
dag=dag,
)
# Define task dependencies
fetch_weather_task >> insert_forecast_task```
[![enter image description here][1]][1]
[![enter image description here][2]][2]
[1]: https://i.sstatic.net/pUAbasfg.png
[2]: https://i.sstatic.net/GfxbKZQE.png
</code>
<code>import requests
import psycopg2
from psycopg2 import Error
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# Define the DAG parameters
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 6, 28),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'metro_cities',
default_args=default_args,
description='Fetch weather forecast data from OpenWeatherMap API and insert into PostgreSQL',
schedule_interval=timedelta(hours=1), # Run every 1 hour
)
# Function to fetch weather forecast data from OpenWeather API
def fetch_weather_forecast(api_key, city):
url = f'http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={api_key}&units=metric'
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
print(f'Failed to fetch data: {response.status_code}')
return None
# Function to insert forecast data into PostgreSQL
def insert_forecast_into_db(api_key, city, **kwargs):
try:
connection = psycopg2.connect(
user="postgres",
password="password",
host="localhost",
port="5432",
database="postgres"
)
cursor = connection.cursor()
forecast_data = fetch_weather_forecast(api_key, city)
if forecast_data:
forecast_list = forecast_data.get('list', [])
for forecast in forecast_list:
date_time = datetime.fromtimestamp(forecast['dt'])
# Example: Inserting forecast data into a table
cursor.execute("""
INSERT INTO weather_forecast (city, temperature, humidity, description, date_time)
VALUES (%s, %s, %s, %s, %s);
""", (
city,
forecast['main']['temp'],
forecast['main']['humidity'],
forecast['weather'][0]['description'],
date_time
))
connection.commit()
print("Forecast data inserted successfully")
except (Exception, Error) as error:
print("Error while connecting to PostgreSQL", error)
finally:
if connection:
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
# Define cities for forecast
cities = ['Seattle', 'Oakland', 'Atlanta']
api_key ='api'
# Define tasks in the DAG for each city
for city in cities:
fetch_weather_task = PythonOperator(
task_id=f'fetch_weather_forecast_{city.lower().replace(" ", "_")}',
python_callable=fetch_weather_forecast,
op_args=[api_key, city],
dag=dag,
)
insert_forecast_task = PythonOperator(
task_id=f'insert_forecast_into_postgres_{city.lower().replace(" ", "_")}',
python_callable=insert_forecast_into_db,
op_kwargs={'api_key': api_key, 'city': city},
provide_context=True, # This allows passing context (like XCom) to the function
dag=dag,
)
# Define task dependencies
fetch_weather_task >> insert_forecast_task```
[![enter image description here][1]][1]
[![enter image description here][2]][2]
[1]: https://i.sstatic.net/pUAbasfg.png
[2]: https://i.sstatic.net/GfxbKZQE.png
</code>
import requests
import psycopg2
from psycopg2 import Error
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# Define the DAG parameters
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 6, 28),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'metro_cities',
default_args=default_args,
description='Fetch weather forecast data from OpenWeatherMap API and insert into PostgreSQL',
schedule_interval=timedelta(hours=1), # Run every 1 hour
)
# Function to fetch weather forecast data from OpenWeather API
def fetch_weather_forecast(api_key, city):
url = f'http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={api_key}&units=metric'
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
print(f'Failed to fetch data: {response.status_code}')
return None
# Function to insert forecast data into PostgreSQL
def insert_forecast_into_db(api_key, city, **kwargs):
try:
connection = psycopg2.connect(
user="postgres",
password="password",
host="localhost",
port="5432",
database="postgres"
)
cursor = connection.cursor()
forecast_data = fetch_weather_forecast(api_key, city)
if forecast_data:
forecast_list = forecast_data.get('list', [])
for forecast in forecast_list:
date_time = datetime.fromtimestamp(forecast['dt'])
# Example: Inserting forecast data into a table
cursor.execute("""
INSERT INTO weather_forecast (city, temperature, humidity, description, date_time)
VALUES (%s, %s, %s, %s, %s);
""", (
city,
forecast['main']['temp'],
forecast['main']['humidity'],
forecast['weather'][0]['description'],
date_time
))
connection.commit()
print("Forecast data inserted successfully")
except (Exception, Error) as error:
print("Error while connecting to PostgreSQL", error)
finally:
if connection:
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
# Define cities for forecast
cities = ['Seattle', 'Oakland', 'Atlanta']
api_key ='api'
# Define tasks in the DAG for each city
for city in cities:
fetch_weather_task = PythonOperator(
task_id=f'fetch_weather_forecast_{city.lower().replace(" ", "_")}',
python_callable=fetch_weather_forecast,
op_args=[api_key, city],
dag=dag,
)
insert_forecast_task = PythonOperator(
task_id=f'insert_forecast_into_postgres_{city.lower().replace(" ", "_")}',
python_callable=insert_forecast_into_db,
op_kwargs={'api_key': api_key, 'city': city},
provide_context=True, # This allows passing context (like XCom) to the function
dag=dag,
)
# Define task dependencies
fetch_weather_task >> insert_forecast_task```
[![enter image description here][1]][1]
[![enter image description here][2]][2]
[1]: https://i.sstatic.net/pUAbasfg.png
[2]: https://i.sstatic.net/GfxbKZQE.png