I have some trouble with an app I’m making, I receive data from a chip connected to 3 sensors then I save that data into a csv file (one csv file per day). It correctly saves the data for all the duration (albeit sometimes there is inexplicably some loss of a few seconds). But somehow when I display it in my graphs tab, either it shows correctly with no problems or, like this time, even though there is data at the recorded time, it displays it like there is none or shows the wrong values at the wrong time or it stays constant despite having increased the values…
I’m not even sure where the error is coming from, here is the code that manages the graphs:
from settings import *
from matplotlib.figure import Figure
from matplotlib.widgets import RectangleSelector
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from settings_menu import Widget
import datetime as dt
from typing import Union
import pandas as pd
import numpy as np
from matplotlib import dates as mdates
class Graph(ctk.CTkFrame):
def __init__(self, master: Misc,
type: Literal["4H", "Total", "24H"],
title: str, ylabel: str,
color: str, graphs: list[FigureCanvasTkAgg]
):
super().__init__(master=master, corner_radius=0)
self.app = master.master.master
self.graph: FigureCanvasTkAgg = None
self._type = type # Type de graphique 4H , Total ou 24H
self.title = title # Titre du graphique
self.ylabel = ylabel # Nom de l'axe des ordonnées
self.color = color
# Extraction de la catégorie du graphique à partir du titre pour pouvoir mettre les bonnes données dans les bons graphs
self.category: Literal["DO", "pH", "Temperature"] = self.title.split("Graph ")[1]
# Variables de contrôle pour le zoom des données affichées
#self.delta: ctk.IntVar = self.master.master.delta # For the values
self.graphs: list[FigureCanvasTkAgg] = graphs # Liste des graphiques existants
def initialize_data(self, data: dict[str, list[Union[datetime, float]]]):
if data is None:
self.app.log_error(ValueError(f"Graph {self.category} data is None"))
return
self.timestamps = pd.DatetimeIndex(data["time"]) # Timestamps
self.values = data["values"] # Values of the graph given category
self.data = pd.DataFrame(
{self.category: self.values},
index=self.timestamps
)
# Obtention des limites de la catégorie (DO, pH, Température)
self.category_value = get_category_boundaries(self.app, self.category)
# Obtention des limites supérieure et inférieure (seuils) de la catégorie du graphique
self.upper_boundary, self.lower_boundary = self.category_value
self.create_graph()
def create_graph(self):
if self._type == "4H":
self.data = self.data.asfreq(freq="10s")
elif self._type == "24H":
self.data = self.data.asfreq(freq="3min")
elif self._type == "Total":
self.data = self.data.asfreq(freq="2h")
# Création du graphique
ax: plt.Axes
fig: Figure
fig, ax = plt.subplots(dpi=70) #dpi = zoom amount
# Création des points sur le graphique
ax.plot(
self.data.index, self.data[self.category],
marker="s", markersize=1, linewidth=0.7, color=self.color, label=self.ylabel
)
# Adding horizontal guide lines at y-values with specified color, linestyle, and linewidth
[ax.axhline(y, color="tab:red", linestyle=":", linewidth=5) for y in self.category_value]
start_date = self.timestamps[0]
end_date = self.timestamps[-1]
ax.set_xlim(mdates.datestr2num([start_date.strftime('%Y-%m-%d %H:%M:%S'), end_date.strftime('%Y-%m-%d %H:%M:%S')]))
ax.set_ylim(self.lower_boundary-2, self.upper_boundary+2)
# Add grids and labels
ax.grid(axis="y", color="gray", linestyle=":", linewidth=0.5) # Adding grid lines with specified color, linestyle, and linewidth
if self._type == "4H":
ax.grid(axis="x", color="gray", linestyle=":", linewidth=1)
ax.set_ylabel(ylabel=self.ylabel, fontsize = 20) # Setting xlabel to empty string and ylabel to the category
title = f"{self.category} over time - "
if self._type == "4H":
title += f"CSV: {start_date.strftime('%Y-%m-%d from %H:%M:%S')} to {end_date.strftime('%H:%M:%S')}"
elif self._type == "24H":
title += f"CSV: {start_date.strftime('%Y-%m-%d')}"
elif self._type == "Total":
title += f"from CSV: {start_date.strftime('%Y-%m-%d %H:%M:%S')} to CVS: {end_date.strftime('%Y-%m-%d %H:%M:%S')}"
ax.set_title(label=title, fontsize=25)
if self._type =='4H':
ax.xaxis.set_major_locator(mdates.MinuteLocator(interval=15)) #intervalle de l'axe des x, montre x toutes les 10mins
elif self._type =='24H':
ax.xaxis.set_major_locator(mdates.MinuteLocator(interval=60)) #intervalle de l'axe des x, montre x toutes les 30mins
elif self._type == 'Total':
ax.xaxis.set_major_locator(mdates.HourLocator(interval=24)) #intervalle de l'axe des x, montre x tout les 1 jour
# Customize the date formatting
# Using ConciseDateFormatter to format the x-axis dates with specific formats for different date components
formatter = mdates.ConciseDateFormatter(
locator=ax.xaxis.get_major_locator(),
formats=["%Y", "%b", "%d", "%HH", "%H:%M", "%S.%f"], # Changed '%-HH' to '%H'
zero_formats=["", "%Y", "%b", "%b-%d", "%H:%M", "%H:%M"]
)
ax.xaxis.set_major_formatter(formatter)
# Final formatting
# ax.spines[["right", "top"]].set_visible(False) # Removing right and top spines
ax.tick_params(axis="both", labelsize=20)
if self.graph is not None:
plt.close(self.graph.figure)
# Création du widget Tkinter pour afficher le graphique
try:
graph = FigureCanvasTkAgg(fig, master=self)
graph.get_tk_widget().place(relx=0, rely=0, relwidth=1, relheight=1)
except Exception as e:
return
class GraphMenu(Menu):
def __init__(self, master,
title: str, func: Callable,
_type: Literal["4H", "Total", "24H"]):
super().__init__(master=master, title=title)
self.extra = lambda: self.create_graphs()
self.get_data = func # Fonction pour obtenir les données du graphique
self.graphs: list[FigureCanvasTkAgg] = [] # Liste des graphiques existants
self._type = _type
self.displayed_graph: ctk.StringVar = self.master.displayed_graph
self.displayed_graph.trace_add("write", self.change_displayed_graph)
self.create_widgets()
def create_widgets(self) -> None:
self.loading_frame = ctk.CTkFrame(master=self, corner_radius=0)
self.loading_label = ctk.CTkLabel(master=self.loading_frame, text=f"Chargement des données...n(temps estimé d'attente: {self.get_estimated_time()}s)")
self.loading_label.place(relx=0.5, rely=0.5, anchor="center")
self.graph_frame = ctk.CTkFrame(master=self, corner_radius=0)
# Création des graphiques en fonction de la catégorie
self.graph_temp = Graph(
master=self.graph_frame,
type=self._type,
title="Graph Temperature",
ylabel="Températuren(°C)",
color=ORANGE,
graphs=self.graphs,
)
self.graph_ph = Graph(
master=self.graph_frame,
type=self._type,
title="Graph pH",
ylabel="pH",
color=PURE_GREEN,
graphs=self.graphs,
)
self.graph_do = Graph(
master=self.graph_frame,
type=self._type,
title="Graph DO",
ylabel="DOn(mg/L)",
color=PURE_BLUE,
graphs=self.graphs,
)
self.last_displayed_graph = self.graph_do
self.place_graphs()
def place_graphs(self):
# Placement des sous-frames dans le frame principal
if self.master._3_graph_at_once.get():
self.graph_do.place(relx=0.01, rely=0, relwidth=0.98, relheight=0.32)
self.graph_ph.place(relx=0.01, rely=0.35, relwidth=0.98, relheight=0.32)
self.graph_temp.place(relx=0.01, rely=0.7, relwidth=0.98, relheight=0.3)
else:
self.graph_do.place(relx=0.01, rely=0, relwidth=0.98, relheight=1)
self.graph_ph.place(relx=0.01, rely=0, relwidth=0.98, relheight=1)
self.graph_temp.place(relx=0.01, rely=0, relwidth=0.98, relheight=1)
def change_displayed_graph(self, *args): #change le graph affiché en fonction de celui choisi dans le combobox, de base affiche les 3 en meme temps
name_to_graph: dict[str, Graph] = {
"DO": self.graph_do,
"pH": self.graph_ph,
"Température": self.graph_temp,
"": None
}
displaying = name_to_graph[self.displayed_graph.get()]
if displaying is None: return
displaying.lift(self.last_displayed_graph)
self.last_displayed_graph = displaying
def create_graphs(self, instant_reload: bool = False) -> None:
if not instant_reload:
self.graph_frame.place_forget()
self.loading_frame.place(relx=0, rely=0, relwidth=1, relheight=1)
self.update()
self.data = self.get_data()
self.graph_frame.place(
relx=0,
rely=0.125 if self._type == "4H" else 0.075,
relwidth=1,
relheight=0.8 if self._type == "4H" else 0.85
)
self.graph_do.initialize_data(data={"time": self.data["time"], "values": self.data["values"]["DO"]})
self.graph_ph.initialize_data(data={"time": self.data["time"], "values": self.data["values"]["pH"]})
self.graph_temp.initialize_data(data={"time": self.data["time"], "values": self.data["values"]["Temperature"]})
self.update()
def get_estimated_time(self) -> float: return 1.5
def get_today_data(self, graph_type: Literal["4H", "24H", "Total"],
selected_csv: str = None, data = None) -> dict[Literal["time", "values"], dict[Literal["DO", "pH", "Temperature"], list[float]]]:
timestamps_sorting: list[Literal[0, 1]]
data: dict[str, Union[list[datetime], dict[str, list[float]]]]
if graph_type == "4H":
if self.upper_edge.get() == "" or self.lower_edge.get() == "" :
return
self.upper_edge: IntVar
self.lower_edge: IntVar
if selected_csv is None:
selected_csv = self.master.csv_filename.get()
day_carry = 0
if graph_type == "4H":
current_time = dt.time(self.upper_edge.get()%24, 0, 0)
if self.upper_edge.get() == 24: day_carry = 1
else: current_time = dt.time(0, 0, 0)
selected_csv_date = datetime.strptime(selected_csv, "%Y-%m-%d").date()
now = datetime.combine(
date=selected_csv_date + timedelta(days=day_carry),
time=current_time
) # Creer un objet now pour si on veut changer de csv, met a jour l'affichage et la date
now_str = now.strftime("%Y/%m/%d-%H:%M:%S") # Pour avoir la date en date heure
lower_hour_time = dt.time((self.lower_edge.get()%24 if graph_type == "4H" else 0), 0, 0) # Pour lire la variable
lower_hour = datetime.combine(
date=selected_csv_date,
time=lower_hour_time
)
lower_hour_str = lower_hour.strftime("%Y/%m/%d-%H:%M:%S")
day_data = self.master.load_data(self.master.csv_filenames[selected_csv]) # Va chercher la data du csv actuel
day_data_values = list(day_data.values())[0] # Met dans une liste toutes les valeurs de la data
timestamps: list[datetime] = [datetime.combine(now, datetime.strptime(t, "%H:%M:%S").time()) for t in day_data_values["Timestamp"]] # Collectionne tout les timestamps
now += timedelta(days=1 if graph_type != "4H" else 0)
now_str = now.strftime("%Y/%m/%d-%H:%M:%S")
timestamps = pd.DatetimeIndex([t for t in timestamps if lower_hour <= t <= now]) # Check si les timestamps sont entre l'heure de debut et l'heure de fin
dates = pd.date_range(start=lower_hour_str, end=now_str, freq="s") # Genere tout les timestamps qui pourraient exister entre le temps de debut et de fin
timestamps_sorting = [] # Masque
i = 0
j = 0
if len(timestamps) > len(dates): # Si il y a plus de timestamps qu'on as collectionner que de timestamp existant alors
while j < len(dates) and i < len(timestamps): # Tant que j est inférieur à la longueur des dates et i est inférieur à la longueur des timestamps
recorded_timestamp = timestamps[j] # Obtiens le timestamp de la liste des timestamps collectés
check_timestamp = dates[i] # Obtiens le timestamp de la liste des timestamps existants
if recorded_timestamp == check_timestamp: # Si le timestamp enregistré correspond au timestamp existant
timestamps_sorting.append(1) # Ajoute 1 à la liste timestamps_sorting
i += 1 # Passe au prochain timestamp existant
else:
timestamps_sorting.append(0) # Sinon ajoute 0 à la liste timestamps_sorting
j += 1 # Passe au prochain timestamp collecté
else: # Si le nombre de timestamps existants est supérieur au nombre de timestamps collectés
while i < len(dates) and j < len(timestamps): # Tant que i est inférieur à la longueur des dates et j est inférieur à la longueur des timestamps
recorded_timestamp = timestamps[j] # Obtiens le timestamp de la liste des timestamps collectés
check_timestamp = dates[i] # Obtiens le timestamp de la liste des timestamps existants
if recorded_timestamp == check_timestamp: # Si le timestamp enregistré correspond au timestamp existant
timestamps_sorting.append(1) # Ajoute 1 à la liste timestamps_sorting
j += 1 # Passe au prochain timestamp existant
else:
timestamps_sorting.append(0) # Sinon ajoute 0 à la liste timestamps_sorting
i += 1 # Passe au prochain timestamp collecté
if j == len(timestamps): #si j est egal a la longueur des timestamps (atteint la fin des timestamps collectionnés)
#alors on ajoute au masque les 0 manquants pour indiquer que c'est les valeurs qu'il manque
timestamps_sorting.extend([0] * (len(dates) - i))
if data is None:
data = {
"time": [],
"values": {
"DO": [],
"pH": [],
"Temperature": []
}
}
if graph_type == "Total":
timestamps_sorting = timestamps_sorting[1:]
lacking_data_offset = 0
# Combine time and value appending into a single loop with conditional assignment
for i, timestamp_sorting in enumerate(timestamps_sorting):
data["time"].append(dates[i])
if timestamp_sorting:
data["values"]["DO"].append(day_data_values["DO"][i - lacking_data_offset])
data["values"]["pH"].append(day_data_values["pH"][i - lacking_data_offset])
data["values"]["Temperature"].append(day_data_values["Temperature"][i - lacking_data_offset])
else:
data["values"]["DO"].append(-1)
data["values"]["pH"].append(-1)
data["values"]["Temperature"].append(-1)
lacking_data_offset += not timestamp_sorting # Increment lacking_data_offset only when there's missing data
return data
class Graph4H(GraphMenu):
def __init__(self, master, title: str):
super().__init__(master=master,
title=title,
func=lambda : self.get_today_data("4H"),
_type="4H")
def update_graphs():
for graph in self.graphs:
plt.close(graph.figure)
self.graphs.clear()
self.create_graphs()
self.extra = update_graphs
self.master = master
self.upper_edge = ctk.IntVar(value=datetime.now().hour)
if self.upper_edge.get() < 4:
self.upper_edge.set(4)
self.lower_edge = ctk.IntVar(value=max(0, self.upper_edge.get() - 4))
#self.lower_edge.trace_add("write", lambda *args: update_graphs())
self.apply_btn = ctk.CTkButton(
master=self,
text="Apply",
width=40,
bg_color=(GRAYDB, GRAY2B),
fg_color=BUTTON_FG_COLOR,
text_color=BUTTON_TEXT_COLOR,
command=update_graphs
)
self.apply_btn.place(relx=0.75, rely=0.01, relheight=0.05)
self.update_value_loop_enable = False
self.upper_edge_range = [str(h) for h in range(4, 25)]
self.lower_edge_range = [str(h) for h in range(0, 21)]
self._create_widgets()
def _create_widgets(self) -> None:
self.lower_edge_combobox = ctk.CTkComboBox(
master=self,
width=60,
bg_color=(GRAYDB, GRAY2B),
command=lambda event: self.upper_edge.set(self.lower_edge.get() + 4),
values=self.lower_edge_range,
variable=self.lower_edge
)
self.upper_edge_combobox = ctk.CTkComboBox(
master=self,
width=60,
bg_color=(GRAYDB, GRAY2B),
command=lambda event: self.lower_edge.set(self.upper_edge.get() - 4),
values=self.upper_edge_range,
variable=self.upper_edge
)
def reset_values():
self.upper_edge.set(datetime.now().hour)
self.lower_edge.set(self.upper_edge.get() - 4)
self.reset_button = ctk.CTkButton(
master=self,
text="Reset",
width=30,
bg_color=(GRAYDB, GRAY2B),
fg_color=BUTTON_FG_COLOR,
text_color=BUTTON_TEXT_COLOR,
command=reset_values
)
ctk.CTkLabel(master=self, text="Heure de début (de la journée) :").place(relx=0.05, rely=0.01)
ctk.CTkLabel(master=self, text="Heure de fin (de la journée) :").place(relx=0.4, rely=0.01)
self.lower_edge_combobox.place(relx=0.25, rely=0.01)
self.upper_edge_combobox.place(relx=0.575, rely=0.01)
self.reset_button.place(relx=0.9, rely=0.01, relheight=0.05)
def update_graphs(self):
running = True
while running:
self.create_graphs()
self.update()
if self.terminate_flag.is_set():
running = False
return
sleep(1)
self.update()
class GraphTotal(GraphMenu):
def __init__(self, master, title: str):
super().__init__(master=master, title=title, func=lambda: self.get_all_data(), _type="Total")
self.data = {"time": [], "values": {"DO": [], "pH": [], "Temperature": []}}
self.last_amount_of_records = 0
def get_estimated_time(self) -> float:
return len(get_data_filenames()) * 2.5
def get_all_data(self) -> dict[Literal['time', 'values'], dict[Literal["DO", "pH", "Temperature"], list[int]]]:
# Fonction pour obtenir les données pour la catégorie spécifiée (DO, pH, Température) sur les 4 dernières heures
self.date_range = get_data_filenames()
if self.last_amount_of_records == len(self.date_range):
return self.data # That to avoid reloading the data when the amount of record is still the same
# What is essentially called "cache"
self.last_amount_of_records = len(self.date_range)
for date in self.date_range:
self.data = self.get_today_data("Total", date, self.data)
return self.data
class Graph24H(GraphMenu):
def __init__(self, master, title: str):
super().__init__(
master=master,
title=title,
func=lambda: self.get_today_data("24H"),
_type="24H"
)