I am trying to stream data from my device to a rabbitmq server and then consume it on my backend and websocket it to the front-end. This is my code:
@router.websocket("/ws/messages")
async def websocket_messages(
websocket: WebSocket,
device_name: str,
pipeline_name: str,
username: str = Depends(get_current_user_ws)
):
await websocket.accept()
pipeline = await websocket.receive_text()
queue_name = pipeline_name
result = channel.queue_declare(queue=queue_name, arguments={
"x-queue-type": "stream", "x-max-length-bytes": 20_000_000_000}, durable=True)
queue = result.method.queue
channel.queue_bind(exchange=STREAMING_EXCHANGE, queue=queue,
routing_key=f"{device_name}/{pipeline_name}")
channel.basic_qos(prefetch_count=1)
pipeline_visualizer = PipelineVisualizer(json.loads(pipeline))
async def on_message(channel, method, props, body):
if body is None:
# No message received, adjust sleep if needed
await asyncio.sleep(1)
payload_str = body.decode("utf-8")
record_data = json.loads(payload_str)
# Draw pipeline results
data, data_type = pipeline_visualizer.render_frame(
record_data)
output = {"messages": record_data}
if data_type == "image":
_, image = cv2.imencode('.jpg', data)
image = image.tobytes()
image = base64.b64encode(image).decode('ascii')
output.update({
"visualizationData": image,
"type": "image"
})
elif data_type == "pointcloud":
with open("./out.pcd", "rb") as fp:
data_bytes = fp.read()
pointcloud = base64.b64encode(data_bytes).decode('ascii')
output.update({
"visualizationData": pointcloud,
"type": "pointcloud"
})
await websocket.send_text(json.dumps(output))
await asyncio.sleep(1/60)
channel.stop_consuming()
try:
channel.basic_consume(
queue=queue, auto_ack=False, on_message_callback=on_message)
channel.start_consuming()
except Exception as e:
print(e)
finally:
await websocket.close()
I have tried using threads and it didn’t work. This code was working just fine when I was using basic_get with the default queue. But as it turns out we need more throughput performance for the data streaming. Reading the rabbitmq docs, they say stream queue’s are the way to go when you need more performance. And that is where I am.