I’m running Celery attached to a Flask server in a custom container (daemonised using supervisord — see below for config), with Redis as the backend. The container is running in Azure Web Apps for Containers and the Redis instance I’m using is hosted in Azure Cache for Redis
The app seems to start ok, and if I make a single web request which calls a task, it seems to work properly. The weirdness begins when I try to call several tasks in (reasonably — multiple tasks per 10s let’s say) quick succession — the first task seems to run OK, however no further tasks run (unless I wait a while before calling again), nor do they appear to be queued, because nothing further seems to happen after the first task completes. I’m wondering whether that’s because if the queue is empty tasks just get passed straight to workers, though this doesn’t answer why subsequent tasks aren’t queued (unless there’s some weirdness where Celery can read but not write)
I’ve tried to have a look in Redis at the queues themselves; there are three keys relevant to celery (the values of all of which appear to be of type SET
): _kombu.binding.celeryev
, _kombu.binding.celery
, and _kombu.binding.celery.pidbox
. As I understand it only the second of these is really relevant here, and this has value {b'celeryx06x16x06x16celery'}
. From this it doesn’t look like anything is successfully being stored in the Redis queue. The app appears to connect to Redis successfully when instantiated — there are even some tasks in the queue (though these I think are legacy and have disappeared now I’ve flushed the cache) [note I’ve obfuscated some names]
-------------- celery@[app_name]__9a8a_eec99cfd v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-5.15.153.1-1.cm2-x86_64-with-glibc2.31 2024-06-10 15:50:53
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: web.app:0x70b55460fb10
- ** ---------- .> transport: rediss://:**@[host]:6380/0
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues] .> celery exchange=celery(direct) key=celery
[tasks]
. module.task.name0
. module.task.name1
. module.task.name2
[2024-06-10 15:50:53,524: INFO/MainProcess] Connected to rediss://:**@[host]:6380/0
What’s also weird about this is that the queues in Redis looked empty, whilst on startup the app finds three
I’ve attached below other relevant components of my setup: how I’m instantiating my app and its config and the supervisord config file
My current best theory about what’s going wrong is it’s something to do with the networking setup between the Azure Redis cache and my app, as when I ssh into my container and try to run celery commands they fail with ‘connection refused’, however I’m not 100% as my app appears to be able to connect to Redis fine (my Flask app has an endpoint which writes to and reads from Redis and this seems to work fine)
from celery import Celery, Task
from flask import Flask
from utils.env_var import get_redis_connection_string
def init_celery(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args, **kwargs):
with app.app_context():
self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config['CELERY'])
celery_app.set_default()
app.extensions['celery'] = celery_app
return celery_app
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY={
'broker_url': get_redis_connection_string(), # does what you expect
'task_acks_late': True,
'worker_prefetch_multiplier': 1,
'task_reject_on_worker_lost': True,
'broker_connection_retry_on_startup': True,
}
)
init_celery(app)
[supervisord]
nodaemon=true
[program:celery]
command=celery -A web.make_celery.celery_app worker -c 1 -l info
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:gunicorn]
command=gunicorn -w 4 --bind=0.0.0.0:50505 --timeout 600 web.app:create_app()
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0