I am creating a sort of UTM simulator using python. On one hand, I have a flask server which is supposed to send a flight plan to a drone (built in dronekit and dronekit-sitl), which executes the flight plan.
The workflow is the following:
- The flask server sends a flight plan to dronekit and dronekit-sitl via MQTT broker in a topic called “flight/plan”
- Dronekit-sitl receives the home location from the flight plan and sets up a vehicle. Then, it publishes the connection string to the topic “flight/drone”.
- The flight manager in dronekit receives the connection string and waypoints from the topics and creates a vehicle which executes the flight according to the flight plan.
Up to this point, it is all good. The problem comes afterwards:
4. As a final task, the drone should elaborate a telemetry message and publish it every second to the mqtt topic “flight/telemetry”.
The loop indicates the message is being generated and supposedly published, but the reality is that instead of publishing the info every second, the message is not published until the flight has ended, time in which the messages are published in bulk.
I have tried threading the processes, using different QoS in mqtt and everything, but I dont seem to find the problem. The logs for mosquitto don’t show any issue, they just show the messages being published and received by other clients in bulk instead of every second.
The thing runs in local, although the simulator part is running on a virtual machine in Linux for compatibility reasons.
The setup for the telemetry part is as follows. I omitted everything non related with the problem.
mqtt_manager.py
class MQTTManager:
def __init__(self, broker_host, broker_port):
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.client.connect(broker_host, broker_port, 60)
self.client.loop_start()
self.connection_string = None
self.flight_plan = None
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f"Conectado al broker MQTT con código: {rc}")
client.subscribe("flight/plans")
client.subscribe("flight/drone",qos=2)
else:
print(f"Error al conectar: {rc}")
def on_message(self, client, userdata, msg):
print(f"Mensaje recibido: {msg.topic} {msg.payload}")
if msg.topic == "flight/plans":
self.flight_plan = json.loads(msg.payload.decode())
if self.connection_string:
self.start_flight()
elif msg.topic == "flight/drone":
self.connection_string = msg.payload.decode()
if self.flight_plan:
self.start_flight()
def start_flight(self):
process_flight_plan(self.flight_plan, self.connection_string, self)
def on_disconnect(self, client, userdata, rc):
print(f"Desconectado con código: {rc}")
if rc != 0:
print("Intentando reconectar...")
try:
client.reconnect()
except Exception as e:
print(f"Error al reconectar: {e}")
def publish_telemetry(self, topic, telemetry_data):
try:
self.client.publish(topic, json.dumps(telemetry_data), qos=2)
print(f"Telemetría enviada: {telemetry_data}")
except Exception as e:
print(f"Error al publicar telemetría: {e}")
mqtt_manager = MQTTManager('localhost', 1883)
flight_manager.py
def send_telemetry_periodically(mqtt_manager):
while True:
send_telemetry(mqtt_manager)
time.sleep(1)
def process_flight_plan(flight_plan, connection_string, mqtt_manager):
vehicle_manager = VehicleManager.get_instance()
waypoints = flight_plan['waypoints']
if not waypoints[0]:
print("No hay waypoints en el plan de vuelo recibido.")
return
try:
vehicle_manager.connect_vehicle(connection_string)
print("Vehículo armado")
print("Despegando...")
vehicle_manager.get_vehicle().simple_takeoff(10)
# Hilo para enviar telemetría
telemetry_thread = threading.Thread(target=send_telemetry_periodically, args=(mqtt_manager,))
telemetry_thread.daemon = True
telemetry_thread.start()
for point in flight_plan['waypoints']:
location = LocationGlobalRelative(point['lat'], point['lon'], point['alt'])
print(f"Navegando a: {location}")
move_thread = threading.Thread(target=vehicle_manager.move_vehicle_to_location, args=(location,))
move_thread.start()
move_thread.join()
print("Aterrizando...")
except Exception as e:
print(f"Error en process_flight_plan:{str(e)}")
telemetry_manager.py
from modules.vehicle_manager import VehicleManager
def send_telemetry(mqtt_manager):
vehicle_manager = VehicleManager.get_instance()
vehicle = vehicle_manager.get_vehicle()
if vehicle and vehicle.mode.name == 'GUIDED':
telemetry = vehicle_manager.capture_vehicle_state()
print("Mensaje de telemetría: ", telemetry)
mqtt_manager.publish_telemetry("flight/telemetry",telemetry)
vehicle.py
class VehicleManager:
_instance = None
vehicle = None
connection_string = None
@staticmethod
def get_instance():
if VehicleManager._instance is None:
VehicleManager()
return VehicleManager._instance
def __init__(self):
if VehicleManager._instance is not None:
raise Exception("This class is a singleton!")
else:
VehicleManager._instance = self
def get_vehicle(self):
return self.vehicle
def capture_vehicle_state(self):
if self.vehicle:
return {
'altitude': self.vehicle.location.global_relative_frame.alt,
'latitude': self.vehicle.location.global_relative_frame.lat,
'longitude': self.vehicle.location.global_relative_frame.lon,
'armed': self.vehicle.armed,
'mode': self.vehicle.mode.name
}
else:
return None
David Vázquez Masero is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.