I have a celery worker, with concurrency set to 1, which takes tasks from RabbitMQ. I want to make a system with only one queue in a single concurrency setup, so, all tasks are going to be added in main queue.
About the task – its just a loop where we update state with task.update_state()
.
@c_app.task(bind=True)
def task(self):
n = 20
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
print('working')
time.sleep(1)
return n
In parallel I have two services.
- Celery-beat service, which creates 1000 tasks (amount as example).
- FastAPI service, which provides two endpoints:
- create task with TOP priority and add it to the main queue
- get actual info about active task and scheduled tasks (by using
inspect()
)
So, in FastAPI can be asked about:
- current active task progress –
inspect().active()
- how many tasks remain –
inspect().scheduled()
There is nothing interesting in this part, so without code.
Question: How can I add task with higher priority to the queue, which already scheduled the tasks to the worker?
Here is my what I’ve tried:
Celery config:
from celery.schedules import crontab
from kombu import Queue, Exchange
broker_url = 'amqp://guest:guest@localhost//'
result_backend = 'db+postgresql://admin:root@localhost/celery_test_db'
worker_concurrency = 1
timezone = 'Europe/Moscow'
enable_utc = False
result_extended = True
beat_schedule = {
'add-5-tasks-every-month': {
'task': 'celery_app.tasks.add_5_tasks',
'options': {'queue': 'celery_q'},
'schedule': 20.0
},
}
broker_transport_options = {'queue_order_strategy': 'priority'}
task_queues = (
Queue("celery_q", Exchange("celery_q"), routing_key="celery_q", queue_arguments={'x-max-priority': 9}),
)
Here is my Celery-Beat task for adding big amount of tasks with low priority:
@c_app.task
def add_5_tasks():
for _ in range(800):
task.apply_async(countdown=1, queue='celery_q', priority=1)
Here is my FastAPI end-point for adding high priority task, which, as I expect, should be executed right after current task is being completed.
@f_app.post("/add-task/")
def add_task():
task_ = task.apply_async(priority=9, countdown=1, queue='celery_q')
print('Task added with high priority:', task_.id)
return {'task_id': task_.id,
'message': 'Task added with high priority'}
And the “core” of the “current_progress” end-point which returns current progress and scheduled tasks:
i = c_app.control.inspect()
scheduled = i.scheduled()
reserved = i.reserved()
active = i.active()
Problem: the prioritisation doesn’t work as as I expected.
It works only if I add these setting in the config:
worker_prefetch_multiplier = 1
task_acks_late = True
But, it causes inspect().scheduled()
to become absolutely useless, as we fetch only one task in a row, so worker thinks that we have only one task in a schedule. So, instead of list of tasks we can see a single task in inspect().scheduled()
MAIN QUESTION: How to enable prioritisation and get all info about scheduled tasks from `inspect().scheduled()?