How to run multiple websocket client connections?

Apologies in advance if this is a redundant question, I’m asking it here as the last resort, after having gone through a lot blogs & articles.

Here’s the situation:

  • I have a django service(let’s call it the “main” service) running an HTTP server & a WebSocket server using django-channels.
  • I have a bunch of other django services(call them target services) wanting to connect to the websocket channels exposed by the main service
  • And target each service needs to establish multiple websocket connections with the main service, and send/receive messages as long the connection is not closed by the main service.

Here’re some sample connection URLs:

URL1 = "wss://main-service.com/ws-connection/1"
URL2 = "wss://main-service.com/ws-connection/2"
URL3 = "wss://main-service.com/ws-connection/3"

And this list of URLs is obviously not known in advance, the main service sends us a mini payload which is used to generate the URL at runtime, and connection is established.

The target service needs to have the ability to maintain thousands of websocket connection(serving as stateful connections) & it also needs to expose an HTTP API to serve direct requests.

So far, I have gotten the target service API running as django project, and a basic websocket client, and the two look like this:

websocket client:

from abc import abstractmethod
from websocket import WebSocketApp as _WebSocketApp


class WebSocketApp(_WebSocketApp):
    def __init__(self, url=None):
            super(WebSocketApp, self).__init__(url=url, on_open=self._on_open on_message=self._on_message, on_error=self._on_error, on_close=self._on_close)
        self.logger = get_logger()  # TODO: set level
        self.logger.debug(f"Websocket adapter is ready")

    @abstractmethod
    def _on_open(self, ws):
        pass

    @abstractmethod
    def _on_message(self, ws, message):
        pass

    @abstractmethod
    def _on_error(self, ws, error):
        pass

    @abstractmethod
    def _on_close(self, ws, code, message):
        pass

django view:

from adapters.web_socket import WebSocketApp
from rest_framework.decorators import api_view
from server.celery import app


@app.task()
def ws_handler(conversation_id):
    # TODO: standardise
    url = f"wss://main-service.com/connections/{id}"
    socket = WebSocketApp(url=url)
    socket.run_forever()


@api_view(["POST"])
def index(request):
    _id = request.data["_id"]
    ws_handler.delay(id=_id)
        # NOTE: verify connection via caching  

Now this works well, but django server gets blocked as soon as a new websocket connection is established

  • For the time being, I got around that by sending websocket connection request to a celery worker, but i know it’s not going end well, as celery is just going to eat my server, and i will soon be out of worker threads, but it allows me to keep my django-server free

What I would like to do:

  • I would really like to turn this into an elegant async websocket client, so that I can run multiple websocket connections per thread, preferably thousands of them.
  • I would still like to keep the celery worker to do the data processing work for me, but I wouldn’t want to burden it with websocket management, so I’d like the websocket stuff to happen outside the celery & the django server process

I’m very new to async programming & asyncio, but here’s what I have put together so far:

import pdb
import asyncio
import websockets
import json


class WebSocketAdapter:
    def __init__(self, uri):
        self.uri = uri
        self.websocket = None

    async def connect(self):
        try:
            self.websocket = await websockets.connect(self.uri)
            await self.on_connect()
            await self.receive_messages()
            await self.receive_messages()
        except Exception as e:
            await self.on_error(e)

    async def disconnect(self):
        await self.websocket.close()

    async def send_message(self, message):
        if self.websocket is not None:
            await self.websocket.send(json.dumps(message))
            print("Message Sent")
        else:
            print("WebSocket connecion is not established")

    async def receive_messages(self):
        while True:
            try:
                message = await self.websocket.recv()
                await self.on_message((message))
                # asyncio.create_task(self.on_message(await self.websocket.recv()))
            except websockets.exceptions.ConnectionClosed:
                await self.on_close()
                break
            except Exception as e:
                await self.on_error(e)

    async def on_connect(self):
        print("Connected to WebSocket server.")

    async def on_message(self, message):
        print(f"Received message from server: {message}")

    async def on_error(self, error):
        print("WebSocket error:", error)

    async def on_close(self):
        print("WebSocket connection closed.")


async def main(url):
    websocket_adapter = WebSocketAdapter(url)
    await websocket_adapter.connect()
    # NOTE: following line of code is never executed :(
    await websocket_adapter.send_message("Message from client.")



async def test():

    urls = [
        "ws://localhost:8765/conversations/1",
        "ws://localhost:8765/conversations/2",
        "ws://localhost:8765/conversations/3",
    ]
    tasks = [main(url) for url in urls]
    await asyncio.gather(*tasks)


# Run the event loop
asyncio.run(test())
# asyncio.get_event_loop().run_until_complete(test())
# pdb.set_trace()
        
  • It seems to allow me to run multiple connections, but I’m not able to send/receive messages after the intial handshake, the code gets blocked at the receive_message method inside the adapter

  • Plus I can not send messages outside the adapater after the connection is established, which also means the control is never returned to the caller, i.e. django server

I have tried a bunch of stuff, I had a simple websocket adapter which was synchronous and worked really well with callbacks, but with asyncio I have to build co-routines? So I’m like how do I send the control back, how do I entertain new connection requests, and how do I even test this thing on my local?

  • I would basically like the ability to receive connection request through an HTTP endpoint, establish a websocket connection in background, and continue receiving new requests, i.e I should be able to initialise many websocket client objects, with configurable callbacks, and just use them as I please xD

Would really appreciate any pointers or suggestions or code snippets ;)….thanks in advance

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật