I am developing a back-end FastAPI server that is a game.
All players connect and interact with the game via a Websocket in the main thread.
The game has multiples categories that are all running in a separate thread with the websocket as a common parameter. Input messages are received in the main thread then dealt within the category thread that will then send output message via the websocket.
It seemed to be working fine at first but I keep having those logs in my server logs:
Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x7f79804ba140>
transport: None
Traceback (most recent call last):
File "/usr/local/lib/python3.10/asyncio/sslproto.py", line 703, in _process_write_backlog
del self._write_backlog[0]
IndexError: deque index out of range
This seems to happen randomly, even after 10 minutes on laucnhing the server and no one has played/connected to any websocket.
They don’t break anything but I suspect them to point to a problem in my code flow. I can see my RAM is slowly going up until the app crashes after few days.
Here is a basic implementation of the code:
api.py
Spawns all the games at start then listens to new websocket connection/messages to redirect them to the corresponding threaded game.
def create_gameflows(categories: list, websocket_manager) -> Dict[str, Gameflow]:
return {cat_name: Gameflow(id=i+1, websocket_manager=websocket_manager, category=cat_name)
for i, cat_name in enumerate(categories)}
app = FastAPI()
manager = ConnectionManager()
lock = asyncio.Lock()
gameflows = create_gameflows(constants.CATEGORIES, manager)
for gameflow in gameflows.values():
gameflow.start()
async def handle_websocket(websocket: WebSocket, client_id: int, category: str) -> None:
""" Handle client incoming messages via websocket
Parameters:
websocket (WebSocket):
"""
async with lock:
await manager.connect(websocket, client_id, category)
gf = gameflows[category]
try:
while True:
data = await websocket.receive_json()
if data['type'] == 'NEW_PLAYER':
while gf.game is None:
time.sleep(0.1)
user = User(**data['data'])
user.websocket_id = str(get_cookie(websocket))
try:
await gf.game.new_player(user, websocket, user.websocket_id, add_to_chat)
except :
await manager.send_error_message(websocket, user.websocket_id, data['data']['category'] )
raise
elif data['type'] == 'ANSWER':
websocket_id = get_cookie(websocket)
await gf.game.user_answer(data, websocket_id, websocket)
except WebSocketDisconnect:
try:
websocket_id = get_cookie(websocket)
except (AttributeError, IndexError):
websocket_id = str(client_id)
await manager.close_connection(websocket_id, websocket_id, category)
@app.websocket("/api/ws/{client_id}/{category}")
async def websocket_endpoint(websocket: WebSocket, client_id:int, category: str):
""" API route that redirect to the websocket endpoint.
Args:
websocket (WebSocket):
"""
await handle_websocket(websocket, client_id, category)
Gameflow.py
Gameflow is an object that, given a category, will create games infinitely. Each created game is given the websocket manager and will send messages to the connected users.
class Gameflow:
def __init__(self, websocket_manager, category: str):
self.websocket_manager = websocket_manager
self.category = category
self.game = None
self.waiting_time = None
self.loop = asyncio.new_event_loop()
self.executor = ThreadPoolExecutor(max_workers=1)
async def play_game(self):
while True:
try:
self.game = Game(id=0, websocket_manager=self.websocket_manager, category=self.category)
await self.game.play_game()
while not self.game.is_over:
await asyncio.sleep(0.1)
self.waiting_time = time.time()
await asyncio.sleep(TIME_BETWEEN_GAME)
except Exception as e:
print(f"Error in {self.category} game: {e}")
await asyncio.sleep(10) # Wait before retrying
def start(self):
asyncio.set_event_loop(self.loop)
self.loop.run_in_executor(self.executor, self.loop.run_forever)
asyncio.run_coroutine_threadsafe(self.play_game(), self.loop)
ConnectionManager is a manager that handles websockets connections for every current game. I can provide the code if needed.
I tried using async with lock
to prevent thread concurrency as I thought it could be a reason for the error logs but I still have them.
I am clearly missing something and would like to have your ideas on this implementation, is it safe? Is there anything that could explain the logged SSL Transport errors? Is there anything that could explain my RAM constantly increasing?
3