I’m developing a Python application that retrieves weather data from an API and attempts to save it to a SQLite database using SQLAlchemy. Despite configuring the database correctly and implementing data retrieval functions, the application does not save weather data as expected.
I configured SQLAlchemy with SQLite, defined models for weather data storage, implemented functions to fetch and store weather information from the OpenWeatherMap API, and verified data storage by querying the database tables. Expected outcome was successful storage of weather data for each location queried.
from datetime import datetime, date
from flask import Flask
from flask_caching import Cache
import requests
from dotenv import load_dotenv
import os
from dataclasses import dataclass
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func # Import func from SQLAlchemy
app = Flask(__name__)
load_dotenv()
api_key = os.getenv('API_KEY')
print(f"Loaded API key: {api_key}") # This will print the API key to confirm it's loaded
# Configure SQLAlchemy
db_name = 'weather.db'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + db_name
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# Configure cache
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
def convert_to_local_time(utc_timestamp, timezone='Europe/London'):
"""Converts UTC timestamp to local time"""
utc_dt = datetime.utcfromtimestamp(int(utc_timestamp))
local_tz = pytz.timezone(timezone)
local_dt = utc_dt.replace(tzinfo=pytz.utc).astimezone(local_tz)
return local_dt.strftime('%H:%M')
@dataclass
class WeatherData:
main: str
description: str
icon: str
temperature: float
humidity: float
sunrise: str
sunset: str
temp_min: float
temp_max: float
date: str
def get_current_weather(lat, lon, API_key):
"""Fetches current weather data from OpenWeather and saves to database."""
try:
response = requests.get(
f'http://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={API_key}&units=metric'
)
response.raise_for_status() # Raise an exception for HTTP errors
data = response.json()
# Create WeatherData object
weather_data = WeatherData(
main=response.get('weather')[0].get('main'),
description=response.get('weather')[0].get('description'),
icon=response.get('weather')[0].get('icon'),
temperature=response.get('main').get('temp'),
humidity=response.get('main').get('humidity'),
sunrise=convert_to_local_time(response.get('sys').get('sunrise'), 'Europe/London'),
sunset=convert_to_local_time(response.get('sys').get('sunset'), 'Europe/London'),
temp_min=response.get('main').get('temp_min'),
temp_max=response.get('main').get('temp_max'),
date=date.today().strftime('%A %dth %B')
)
return weather_data
except requests.exceptions.RequestException as e:
print(f"Error fetching current weather: {e}")
return None
@dataclass
class ForecastData:
date: str
temp_min: float
temp_max: float
description: str
icon: str
def get_forecast_weather(lat, lon, API_key):
"""Fetches forecast weather data from OpenWeather and saves to database."""
try:
response = requests.get(
f'http://api.openweathermap.org/data/2.5/forecast/?lat={lat}&lon={lon}&cnt={5}&appid={API_key}&units=metric'
).json()
forecast_list = response.get('list', [])
forecast_data_list = []
for forecast in forecast_list:
forecast_data = ForecastData(
date=datetime.utcfromtimestamp(forecast['dt']).strftime('%H:%M'),
temp_min=forecast['main']['temp_min'],
temp_max=forecast['main']['temp_max'],
description=forecast['weather'][0]['description'],
icon=forecast['weather'][0]['icon']
)
forecast_data_list.append(forecast_data)
return forecast_data_list
except requests.exceptions.RequestException as e:
print(f"Error fetching forecast weather: {e}")
return []
# SQLAlchemy models for weather data
class CurrentWeather(db.Model):
__tablename__ = 'current_weather'
id = db.Column(db.Integer, primary_key=True)
city = db.Column(db.String(80), nullable=False)
country = db.Column(db.String(80), nullable=False)
main = db.Column(db.String(50))
description = db.Column(db.String(100))
icon = db.Column(db.String(50))
temperature = db.Column(db.Float)
humidity = db.Column(db.Float)
sunrise = db.Column(db.String(50))
sunset = db.Column(db.String(50))
temp_min = db.Column(db.Float)
temp_max = db.Column(db.Float)
date_recorded = db.Column(db.Date, nullable=False, default=lambda: date.today())
class ForecastWeather(db.Model):
__tablename__ = 'forecast_weather'
id = db.Column(db.Integer, primary_key=True)
city = db.Column(db.String(80), nullable=False)
country = db.Column(db.String(80), nullable=False)
forecast_time = db.Column(db.String(50), nullable=False)
temp_min = db.Column(db.Float)
temp_max = db.Column(db.Float)
description = db.Column(db.String(100))
icon = db.Column(db.String(50))
date_recorded = db.Column(db.Date, nullable=False, default=func.current_date())
current_weather_id = db.Column(db.Integer, db.ForeignKey('current_weather.id'), nullable=False)
current_weather = db.relationship('CurrentWeather', backref=db.backref('forecasts', lazy=True))
def main(selected_city=None):
"""Function fetches data for specified city."""
locations = [
("Cumbria", "United Kingdom", 54.4609, -3.0886),
("Corfe Castle", "United Kingdom", 50.6395, -2.0566),
("The Cotswolds", "United Kingdom", 51.8330, -1.8433),
("Cambridge", "United Kingdom", 52.2053, 0.1218),
("Bristol", "United Kingdom", 51.4545, -2.5879),
("Oxford", "United Kingdom", 51.7520, -1.2577),
("Norwich", "United Kingdom", 52.6309, 1.2974),
("Stonehenge", "United Kingdom", 51.1789, -1.8262),
("Watergate Bay", "United Kingdom", 50.4429, -5.0553),
("Birmingham", "United Kingdom", 52.4862, -1.8904)
]
weather_data_list = []
for name, country, lat, lon in locations:
if selected_city and selected_city != name.lower().replace(" ", "_"):
continue # Skip if selected city is specified and doesn't match current city
# Fetch current weather data
current_weather_data = get_current_weather(lat, lon, api_key)
if current_weather_data:
# Fetch forecast weather data
forecast_weather_data = get_forecast_weather(lat, lon, api_key)
# Create CurrentWeather instance and save to database
current_weather = CurrentWeather(
city=name,
country=country,
main=current_weather_data.main,
description=current_weather_data.description,
icon=current_weather_data.icon,
temperature=current_weather_data.temperature,
humidity=current_weather_data.humidity,
sunrise=current_weather_data.sunrise,
sunset=current_weather_data.sunset,
temp_min=current_weather_data.temp_min,
temp_max=current_weather_data.temp_max,
)
db.session.add(current_weather)
db.session.commit()
# Create ForecastWeather instances and save to database
for forecast_data in forecast_weather_data:
forecast_weather = ForecastWeather(
city=name,
country=country,
forecast_time=forecast_data.date,
temp_min=forecast_data.temp_min,
temp_max=forecast_data.temp_max,
description=forecast_data.description,
icon=forecast_data.icon,
current_weather=current_weather
)
db.session.add(forecast_weather)
db.session.commit()
# Append to weather_data_list
weather_data_list.append((name, country, current_weather_data, forecast_weather_data))
return weather_data_list
if __name__ == '__main__':
# Create database tables based on models
db.create_all()
# Run the main function to fetch and store weather data
weather_data = main()
print("Weather Data:")
for data in weather_data:
print(f"City: {data[0]}, Country: {data[1]}")
print(f"Current Weather: {data[2]}")
print(f"Forecast Weather: {data[3]}")
print("==============================")
# Start the Flask application
app.run(debug=True)```
Alex Goodwin is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.