I have some async code and I need to run it inside the Celery task.
I tried the approach with using asgiref.sync.async_to_sync()
, but it turned out that it creates new event loop every time. And it brakes my code since I use SQLAlchemy session pool and there are restrictions about using sessions in different threads and event loops.
It is important to note that I don’t care about performance issues. I understand the overheads of using this approach. I just don’t want to rewrite my async code to sync.
Trying to run all my async functions in one event loop I created a simple helper module:
import asyncio
import time
from collections.abc import Awaitable
from threading import Thread
from typing import TypeVar
class _SingleEventLoop:
_loop: asyncio.AbstractEventLoop | None = None
def _enshure_loop_is_running(self):
if (self._loop is None) or (self._loop.is_closed()):
try:
self._loop = asyncio.get_event_loop() # Try to get running event loop
except RuntimeError:
self._loop = asyncio.new_event_loop() # Or create new event loop
if not self._loop.is_running():
self._loop_thread = Thread(target=self._eventloop_thread_run, daemon=True)
self._loop_thread.start()
def _eventloop_thread_run(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
def execute_async_task(self, coroutine):
self._enshure_loop_is_running()
feature = asyncio.run_coroutine_threadsafe(coroutine, self._loop)
while not feature.done():
time.sleep(0.01)
return feature.result()
_single_event_loop = _SingleEventLoop()
R = TypeVar("R")
def execute_async_task(coroutine: Awaitable[R]) -> R:
"""Executes async tasks in single event loop"""
return _single_event_loop.execute_async_task(coroutine=coroutine)
I can execute my async code like it’s shown below:
from single_event_loop_runner.run_async import execute_async_task
@celery_app.task()
def sync_celery_task(param):
res = execute_async_task(async_func(param))
return res
It seems to work fine for my use case, but I want to know what are the possible negative consequences of using this approach?
Can I use any easier way to solve my task?
I’m quite new to Celery and need your help!)