I managed to take a certain part of A JSON payload from mqtt [BCID and Location]. My problem is how do I visualize that? I keep having errors such as ‘illegal hardware instruction’, ‘cv2.error: Unknown C++ exception from OpenCV code’, etc.
- My specs are MacOS Big Sur Version 11.7.10
#visualize location
#CONVERTED INTO PIXEL!
import cv2
from pathlib import Path
import uuid
import ssl
import json
from dotenv import dotenv_values
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion
import numpy as np
IMAGE_SIZE = 1080 # Size of the image (1080x1080)
class Visualizer:
def visualize():
print()
class MQTTClient:
"""MQTTClient"""
def __init__(self, broker_url: str, port: int, topics: dict, **kwargs):
self.broker_url = broker_url
self.port = port
self.topics = topics
self.client = Client(
CallbackAPIVersion.VERSION2, f"visualizer-{str(uuid.uuid4()).split('-')[4]}"
)
self.client.tls_set(
ca_certs=None,
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1_2,
)
if {"broker_user", "broker_pass"}.issubset(kwargs):
self.client.username_pw_set(kwargs["broker_user"], kwargs["broker_pass"])
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def start(self):
"""Connects to the Broker and creates a background thread to wait for msgs."""
self.client.connect(self.broker_url, port=self.port, keepalive=60)
self.client.subscribe([(topic, 0) for topic in self.topics])
self.client.loop_start()
def on_connect(self, client, userdata, connect_flags, reason_code, properties):
"""Callback on broker connection."""
if reason_code == 0:
print(f"MQTT broker connection established to: {self.broker_url}")
else:
print(f"Error connecting to MQTT broker: {reason_code}")
def on_message(self, client, userdata, msg):
"""Callback on message received."""
data = json.loads(msg.payload.decode("utf-8"))
location = data['outputs'][0]['location']
pixel_coordinates = convert_to_pixels(location)
print(f"BCID: {data['outputs'][0]['bcid']} Pixel Coordinates: {pixel_coordinates}")
def convert_to_pixels(location):
"""
Convert normalized location coordinates to pixel coordinates.
Args:
location (list): Normalized coordinates [xmin, ymin, xmax, ymax].
Returns:
list: Pixel coordinates in the 1080x1080 image [xmin_px, ymin_px, xmax_px, ymax_px].
"""
# Scale and translate normalized coordinates to 1080x1080 pixel coordinates
# Normalized range is assumed to be [-1.0, 1.0]
xmin, ymin, xmax, ymax = location
# Map normalized coordinates [-1.0, 1.0] to [0, IMAGE_SIZE]
xmin_px = int((xmin + 1) * IMAGE_SIZE / 2)
ymin_px = int((ymin + 1) * IMAGE_SIZE / 2)
xmax_px = int((xmax + 1) * IMAGE_SIZE / 2)
ymax_px = int((ymax + 1) * IMAGE_SIZE / 2)
return [xmin_px, ymin_px, xmax_px, ymax_px]
CONFIG = dotenv_values(Path.cwd() / ".env")
BROKER_URL = CONFIG["MQTT_BROKER"]
BROKER_USER = CONFIG["MQTT_USER"] if "MQTT_USER" in CONFIG else ""
BROKER_PASS = CONFIG["MQTT_PASS"] if "MQTT_PASS" in CONFIG else ""
PORT = 8883
TOPICS = ["/viana/+/ze_all_tracks"]
def main():
client = MQTTClient(BROKER_URL, PORT, TOPICS)
client.start()
input()
if __name__ == "__main__":
main()
I tried a lot of ways to visualize the data, a lot of them don’t support my os. I tried using XQuartz, but that as well was inefficient and unsupported according to my terminal. I also tried matplotlib but it doesn’t work too.
lance amarles is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.