Apologies in advance if this is a redundant question, I’m asking it here as the last resort, after having gone through a lot blogs & articles.
Here’s the situation:
- I have a django service(let’s call it the “main” service) running an HTTP server & a WebSocket server using django-channels.
- I have a bunch of other django services(call them target services) wanting to connect to the websocket channels exposed by the main service
- And target each service needs to establish multiple websocket connections with the main service, and send/receive messages as long the connection is not closed by the main service.
Here’re some sample connection URLs:
URL1 = "wss://main-service.com/ws-connection/1"
URL2 = "wss://main-service.com/ws-connection/2"
URL3 = "wss://main-service.com/ws-connection/3"
And this list of URLs is obviously not known in advance, the main service sends us a mini payload which is used to generate the URL at runtime, and connection is established.
The target service needs to have the ability to maintain thousands of websocket connection(serving as stateful connections) & it also needs to expose an HTTP API to serve direct requests.
So far, I have gotten the target service API running as django project, and a basic websocket client, and the two look like this:
websocket client:
from abc import abstractmethod
from websocket import WebSocketApp as _WebSocketApp
class WebSocketApp(_WebSocketApp):
def __init__(self, url=None):
super(WebSocketApp, self).__init__(url=url, on_open=self._on_open on_message=self._on_message, on_error=self._on_error, on_close=self._on_close)
self.logger = get_logger() # TODO: set level
self.logger.debug(f"Websocket adapter is ready")
@abstractmethod
def _on_open(self, ws):
pass
@abstractmethod
def _on_message(self, ws, message):
pass
@abstractmethod
def _on_error(self, ws, error):
pass
@abstractmethod
def _on_close(self, ws, code, message):
pass
django view:
from adapters.web_socket import WebSocketApp
from rest_framework.decorators import api_view
from server.celery import app
@app.task()
def ws_handler(conversation_id):
# TODO: standardise
url = f"wss://main-service.com/connections/{id}"
socket = WebSocketApp(url=url)
socket.run_forever()
@api_view(["POST"])
def index(request):
_id = request.data["_id"]
ws_handler.delay(id=_id)
# NOTE: verify connection via caching
Now this works well, but django server gets blocked as soon as a new websocket connection is established
- For the time being, I got around that by sending websocket connection request to a celery worker, but i know it’s not going end well, as celery is just going to eat my server, and i will soon be out of worker threads, but it allows me to keep my django-server free
What I would like to do:
- I would really like to turn this into an elegant async websocket client, so that I can run multiple websocket connections per thread, preferably thousands of them.
- I would still like to keep the celery worker to do the data processing work for me, but I wouldn’t want to burden it with websocket management, so I’d like the websocket stuff to happen outside the celery & the django server process
I’m very new to async programming & asyncio, but here’s what I have put together so far:
import pdb
import asyncio
import websockets
import json
class WebSocketAdapter:
def __init__(self, uri):
self.uri = uri
self.websocket = None
async def connect(self):
try:
self.websocket = await websockets.connect(self.uri)
await self.on_connect()
await self.receive_messages()
await self.receive_messages()
except Exception as e:
await self.on_error(e)
async def disconnect(self):
await self.websocket.close()
async def send_message(self, message):
if self.websocket is not None:
await self.websocket.send(json.dumps(message))
print("Message Sent")
else:
print("WebSocket connecion is not established")
async def receive_messages(self):
while True:
try:
message = await self.websocket.recv()
await self.on_message((message))
# asyncio.create_task(self.on_message(await self.websocket.recv()))
except websockets.exceptions.ConnectionClosed:
await self.on_close()
break
except Exception as e:
await self.on_error(e)
async def on_connect(self):
print("Connected to WebSocket server.")
async def on_message(self, message):
print(f"Received message from server: {message}")
async def on_error(self, error):
print("WebSocket error:", error)
async def on_close(self):
print("WebSocket connection closed.")
async def main(url):
websocket_adapter = WebSocketAdapter(url)
await websocket_adapter.connect()
# NOTE: following line of code is never executed :(
await websocket_adapter.send_message("Message from client.")
async def test():
urls = [
"ws://localhost:8765/conversations/1",
"ws://localhost:8765/conversations/2",
"ws://localhost:8765/conversations/3",
]
tasks = [main(url) for url in urls]
await asyncio.gather(*tasks)
# Run the event loop
asyncio.run(test())
# asyncio.get_event_loop().run_until_complete(test())
# pdb.set_trace()
-
It seems to allow me to run multiple connections, but I’m not able to send/receive messages after the intial handshake, the code gets blocked at the receive_message method inside the adapter
-
Plus I can not send messages outside the adapater after the connection is established, which also means the control is never returned to the caller, i.e. django server
I have tried a bunch of stuff, I had a simple websocket adapter which was synchronous and worked really well with callbacks, but with asyncio I have to build co-routines? So I’m like how do I send the control back, how do I entertain new connection requests, and how do I even test this thing on my local?
- I would basically like the ability to receive connection request through an HTTP endpoint, establish a websocket connection in background, and continue receiving new requests, i.e I should be able to initialise many websocket client objects, with configurable callbacks, and just use them as I please xD
Would really appreciate any pointers or suggestions or code snippets ;)….thanks in advance