Serial Real Time Plotter python

The code below plots numbers received from serial port. However, the values of locals()[self.axes_mapping[axis]] are not all the received data from the function receive_data. It seems some values are lost. If I try to print the values of x, they correspond with the received data. How do I make sure that all the received data are plotted?

import sys
from PyQt6.QtWidgets import QApplication, QMainWindow
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
import numpy as np
from collections import deque
import threading
import queue
import serial

BUFFER_SIZE = 100

class RealTimePlot(QMainWindow):
    def __init__(self):
        super().__init__()

        self.setup_ui()
    
        self.serial_port = serial.Serial('COM6', 115200)
        self.N = 100
        self.fs = 1000  # Sampling frequency in Hz (adjust according to your setup)
        self.T = 1/self.fs
        self.x_values = np.arange(0, self.N*self.T, self.T)

        # Circular buffers for time domain plots
        self.axes = ['X', 'RMS']

        self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
        self.z_index = {axis: 0 for axis in self.axes}

        # Axes variable mapping
        self.axes_mapping = {'X': 'x', 'RMS': 'rms'}

        # Plotting setup
        self.setup_plots()

        self.data_queue = queue.Queue()
        # Lock for synchronizing access to the data queue
        self.data_queue_lock = threading.Lock()

        # Create and start the receiving thread
        self.receive_thread = threading.Thread(target=self.receive_data)
        self.receive_thread.daemon = True
        self.receive_thread.start()

        # Start the animation
        self.timer = QtCore.QTimer(self)
        self.timer.timeout.connect(self.update_plot)
        self.timer.start(10)

    def setup_ui(self):
        self.central_widget = pg.GraphicsLayoutWidget()
        self.setCentralWidget(self.central_widget)

    def setup_plots(self):
        self.plots = {axis: self.central_widget.addPlot(row=i, col=0, title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
                      for i, axis in enumerate(self.axes)}

        for plot in self.plots.values():
            plot.setLabel('bottom', 'Time', 's')
            # plot.setLabel('left', 'Amplitude', 'g')
            plot.setYRange(-2, 5000)


        linha1 = pg.mkPen((52, 255, 52), width=2) # R G B & width
        # linha4 = pg.mkPen((255, 255, 255), width=2) 
        self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
        # self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
        self.plots['RMS'].setYRange(0, 5000)

    def receive_data(self):
        data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
        data_cnt = 0
        rx_flag = True
        while True:
            if self.serial_port.in_waiting > 0.0:  # Check if there is data waiting
                    data_str =  int.from_bytes(self.serial_port.read(2), byteorder='little', signed = False)
                    rx_flag = True

            with self.data_queue_lock:
                if rx_flag == True:
                    data_buffer[data_cnt] = data_str
                    data_cnt = data_cnt+1
                    rx_flag = False

                # Check if the buffer size is reached, then update the plot
                if data_cnt >= BUFFER_SIZE:
                    self.data_queue.put(data_buffer)
                    data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
                    data_cnt = 0
                    

    def calculate_rms(self, x):
        return np.sqrt(np.mean(np.square([x])))

    def update_plot(self):
        with self.data_queue_lock:
            while not self.data_queue.empty():
                data_buffer = self.data_queue.get()

                for data_str in data_buffer:
                    x = data_str
                    rms = self.calculate_rms(data_buffer)

                for axis in self.axes:
                    self.z_values[axis].append(locals()[self.axes_mapping[axis]])
                    self.lines[axis].setData(self.x_values, self.z_values[axis])
                    print(locals()[self.axes_mapping[axis]])
        return self.lines.values()
    

def closeEvent(self, event):
    self.csv_file.close()
    event.accept()

def main():
    app = QApplication(sys.argv)
    window = RealTimePlot()
    window.show()
    sys.exit(app.exec())

if __name__ == '__main__':
    main()

The example is from here

Attempted Switch From DPC (blue screen error)- while running real-time plotting with python script
byu/iam_MRDX7 inembedded

You have a bug. This code

for data_str in data_buffer:
    x = data_str  # x = one item, drops the rest
    rms = self.calculate_rms(data_buffer)

for axis in self.axes:
    self.z_values[axis].append(locals()[self.axes_mapping[axis]])
    self.lines[axis].setData(self.x_values, self.z_values[axis])

should be

x = data_buffer  # x = all items
rms = [self.calculate_rms(data_buffer)]

for axis in self.axes:  # note the use of extend for list/array
    self.z_values[axis].extend(locals()[self.axes_mapping[axis]])
    self.lines[axis].setData(self.x_values, self.z_values[axis])

Also both axes should have different time scales, since the second one is the average of 100 points in the first one.


Lastly the use of locals is very discouraged, it is too brittle and prevents future refactoring or code modification. I’d probably just use a dictionary instead.

new_plot_data = {}  # replaces locals()
new_plot_data["x"] = data_buffer
new_plot_data["rms"] = [self.calculate_rms(data_buffer)]

full example (without pyserial use)

import random
import time
import sys
from PyQt6.QtWidgets import QApplication, QMainWindow
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
import numpy as np
from collections import deque
import threading
import queue
import serial

BUFFER_SIZE = 100


class RealTimePlot(QMainWindow):
    def __init__(self):
        super().__init__()

        self.setup_ui()

        self.serial_port = None
        self.N = 100
        self.fs = 1000  # Sampling frequency in Hz (adjust according to your setup)
        self.T = 1 / self.fs
        self.x_values = np.arange(0, self.N * self.T, self.T)

        # Circular buffers for time domain plots
        self.axes = ['X', 'RMS']

        self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
        self.z_index = {axis: 0 for axis in self.axes}

        # Axes variable mapping
        self.axes_mapping = {'X': 'x', 'RMS': 'rms'}

        # Plotting setup
        self.setup_plots()

        self.data_queue = queue.Queue()
        # Lock for synchronizing access to the data queue
        self.data_queue_lock = threading.Lock()

        # Create and start the receiving thread
        self.receive_thread = threading.Thread(target=self.receive_data)
        self.receive_thread.daemon = True
        self.receive_thread.start()

        # Start the animation
        self.timer = QtCore.QTimer(self)
        self.timer.timeout.connect(self.update_plot)
        self.timer.start(10)

    def setup_ui(self):
        self.central_widget = pg.GraphicsLayoutWidget()
        self.setCentralWidget(self.central_widget)

    def setup_plots(self):
        self.plots = {axis: self.central_widget.addPlot(row=i, col=0,
                                                        title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
                      for i, axis in enumerate(self.axes)}

        for plot in self.plots.values():
            plot.setLabel('bottom', 'Time', 's')
            # plot.setLabel('left', 'Amplitude', 'g')
            plot.setYRange(-2, 5000)

        linha1 = pg.mkPen((52, 255, 52), width=2)  # R G B & width
        # linha4 = pg.mkPen((255, 255, 255), width=2)
        self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
        # self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
        self.plots['RMS'].setYRange(0, 5000)

    def receive_data(self):
        data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
        data_cnt = 0
        rx_flag = True
        while True:
            time.sleep(0.01)
            data_str = random.random() * 256
            rx_flag = True

            with self.data_queue_lock:
                if rx_flag == True:
                    data_buffer[data_cnt] = data_str
                    data_cnt = data_cnt + 1
                    rx_flag = False

                # Check if the buffer size is reached, then update the plot
                if data_cnt >= BUFFER_SIZE:
                    self.data_queue.put(data_buffer)
                    data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
                    data_cnt = 0

    def calculate_rms(self, x):
        return np.sqrt(np.mean(np.square([x])))

    def update_plot(self):
        with self.data_queue_lock:
            while not self.data_queue.empty():
                data_buffer = self.data_queue.get()

                new_plot_data = {}
                new_plot_data["x"] = data_buffer
                new_plot_data["rms"] = [self.calculate_rms(data_buffer)]

                for axis in self.axes:
                    self.z_values[axis].extend(new_plot_data[self.axes_mapping[axis]])
                    self.lines[axis].setData(self.x_values, self.z_values[axis])
        return self.lines.values()


def closeEvent(self, event):
    self.csv_file.close()
    event.accept()


def main():
    app = QApplication(sys.argv)
    window = RealTimePlot()
    window.show()
    sys.exit(app.exec())


if __name__ == '__main__':
    main()

2

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật