Although not recommended by the celery documentation
CELERY_TASK_ALWAYS_EAGER = True option allows E2E testing where celery task is called inside a function with delay() method.
def wanna_test_function_e2e():
#.... logic
dummy_task.delay() #<- modifies some db value
The problem is when i mix async function in celery task:
@celery_app.task # type: ignore[misc]
def dummy_task() -> None:
async_to_sync(ext_client.get_user)(user_id)
...
The task runs fine locally, the issue is with the testing.
With EAGER option, I am unable to run test:
WARNING kombu.connection:connection.py:669 No hostname was supplied. Reverting to default 'localhost'
ERROR celery.app.trace:trace.py:267 Task
line 237, in ...
admin = async_to_sync(ext_client.get_user)(user_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
raise RuntimeError(
RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
As EAGER option forces celery to run in a main process but async_to_sync tries to create new asyncio event loop and fails as the event loop always exists.
If I follow Celery’s guide and use pytest-celery without EAGER option:
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': "memory://",
'result_backend': 'rpc',
}
@pytest.fixture(scope="session", autouse=True)
def celery_register_tasks(
celery_session_app, celery_session_worker, socket_enabled
) -> None:
celery_session_app.register_task(dummy_task)
Now there is huge problem:
async def test_dummy_task(
self,
celery_session_worker,
celery_register_tasks,
) -> None:
...
import asyncio
await asyncio.sleep(10) <- without this the remaining test can run before task runs
user = await User.objects.aget(id=1)
assert user.is_modified <- without sleep this fails!
even worse if I run a single test. There may be no DB when celery worker tries to run its task as the DB is already destoryed!
{"process": 11092, "name": "celery.app.trace", "funcName": "_log_error", "lineno": 267, "correlation_id": "4b5953eb-df63-49dc-aa89-f110eb1e36d5", "message": "Task apps.dummy.tasks.dummy_task[6cef93cb-28bb-4a3c-87f5-a626e37535a6] raised unexpected: OperationalError(1049, "Unknown database 'test_dummy'")
So how does one do E2E test on a function that has celery task that utilises async function ?
I can test (1) wanna_test_function_e2e by mocking celery task and (2) celery task separately but really E2E test would be desirable.