I have used Celery for a while now (not Django involved), but all of my tasks have always been defined in one main tasks.py
file like this:
from celery import Celery
app = Celery(
'tasks',
broker=["amqp://..."],
backend="redis://..."
)
app.config_from_object('celeryconfig')
@app.task(
bind=True,
max_retries=5,
autoretry_for=(
...
)
)
def my_task(self, ...):
<task code here>
...
Over time the project has grown quite large and I am trying to breakout these tasks into various modules depending on the systems they interact with (databases, APIs, etc.). I want to load all of these module tasks using the app.autodiscover_tasks()
function but keep running into ModuleNotFoundError
.
Below is an example of my project structure and sample codes:
my_project/
| app.py
| celery_tasks/
| | moduleA/
| | | __init__.py
| | | tasks.py
| | moduleB/
| | | __init.py
| | | tasks.py
app.py
now contains:
from celery import Celery
app = Celery(
'tasks',
broker=["amqp://..."],
backend="redis://..."
)
app.config_from_object('celeryconfig')
app.autodiscover_tasks(['.celery_tasks.moduleA', '.celery_tasks.moduleB'])
celery_tasks/moduleA/__init__.py
:
from .tasks import *
celery_tasks/moduleA/tasks.py
:
from celery import Task
from app import app
class ModuleA(Task):
bind = True
max_retries = 5
autoretry_for = ( ... )
@app.task(base=ModuleA)
def my_task(self, ...):
<task code here>
I have tried this multiple ways with different structures– Like removing the celery_tasks/
parent folder and having each module live at the root of the project with app.py
and referencing them like app.autodiscover_tasks(['moduleA', 'moduleB'])
, but it gives me the same errors about No module named 'moduleA'
.
This seems to be just what the documentation describes, but it can be a bit terse at times and I’m sure I am missing something simple.
Can someone please point me in the right direction (without making me feel like a complete idiot)? =)
Environment
- Python 3.12 (Alpine Linux container)
- Celery 5.4.0