I have a bundle which contains Celery and RabbitMQ for tasks and FastApi app for web requests.
The celery app starts from command prompt with celery -A celery_app worker -l info -P gevent
.
Rabbit is deployed in Docker Container.
FastApi starts from python script.
Here is the code. The question is below.
fastapi_app/main.py
from __future__ import absolute_import
from fastapi import FastAPI
from celery.result import AsyncResult
from celery_app.tasks import task, get_current_tasks
from celery_app.celery import c_app
from fastapi_app.model import Worker, Task
f_app = FastAPI()
@f_app.post("/task/")
def run_task():
_task = task.apply_async()
return {"task_id": _task.id}
@f_app.get("/task_info/{task_id}")
def get_progress(task_id):
result = AsyncResult(task_id, app=c_app)
return Task(id=task_id, state=result.state, meta=result.info)
@f_app.get("/curr_progress/")
def get_current_progress():
response = {'workers': []}
for worker, tasks_list in get_current_tasks().items():
worker_tasks_id = [task_.get('id') for task_ in tasks_list]
worker_ = Worker(name=worker)
for id_ in worker_tasks_id:
result = AsyncResult(id_, app=c_app)
worker_.tasks.append(Task(id=id_, state=result.state, meta=result.info))
response['workers'].append(worker_)
return response
if __name__ == "__main__":
import uvicorn
uvicorn.run(f_app, host="localhost", port=8000)
fastapi_app/model.py
from pydantic import BaseModel
from typing import List, Any, Optional
class Task(BaseModel):
id: Optional[str] = None
state: Optional[str] = None
meta: dict | Any | None = None
class Worker(BaseModel):
name: Optional[str] = None
tasks: List[Task] = list()
celery_app/tasks.py
from __future__ import absolute_import
import threading
import time
from celery_app.celery import c_app
def get_current_tasks() -> dict:
i = c_app.control.inspect()
return i.active()
def get_registered_tasks() -> dict:
i = c_app.control.inspect()
return i.registered()
@c_app.task(bind=True)
def task(self):
print(f'task started in {threading.current_thread()}. Thread alive:')
for i in threading.enumerate():
print(i)
n = 60
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
time.sleep(1)
print(f'task finished in {threading.current_thread()}n')
return n
celery_app/celery.py
from __future__ import absolute_import
from celery import Celery
c_app = Celery('celery_app',
broker='amqp://guest:guest@localhost',
backend='rpc://',
include=['celery_app.tasks'])
There, in fastapi_app/main.py
I have the function, which starts the task run_task()
and the function, which get the current progress of all running tasks get_current_progress()
.
The last one depends on celery.result.AsyncResult()
which depends on update_state()
method in task(self)
function in celery_app/tasks.py
.
Here is the problem. When I start only one task by requesting FastApi server, task’s progress displays correctly.
{
"workers": [
{
"name": "celery@wsmsk1n3075",
"tasks": [
{
"id": "271531c2-48e6-4c71-a9ef-31bce434c649",
"state": "PROGRESS",
"meta": {
"done": 3,
"total": 60
}
}
]
}
]
}
But when I start multiple tasks (send a couple of task-requests to FastApi server) displaying becomes incorrect. Especially the progress’s meta info.
{
"workers": [
{
"name": "celery@wsmsk1n3075",
"tasks": [
{
"id": "4d05d0f0-f058-4372-8eec-c84853188655",
"state": "PROGRESS",
"meta": {
"done": 9,
"total": 60
}
},
{
"id": "0ca82db4-7e04-4bfd-9d73-6a190decd4c6",
"state": "PROGRESS",
"meta": null
},
{
"id": "ba8aa34b-e185-47cf-bed3-f3ca07257afc",
"state": "PROGRESS",
"meta": null
},
{
"id": "3e2941f5-1285-4062-aea0-31a3c9b1cc21",
"state": "PROGRESS",
"meta": null
}
]
}
]
}
It’s important to note, that in celery all 4 tasks are being executed in separate threads:
[2024-07-23 13:02:06,427: WARNING/MainProcess] <_DummyThread(Dummy-6, started daemon 1901360139936)>
[2024-07-23 13:02:06,427: WARNING/MainProcess] <_DummyThread(Dummy-7, started daemon 1901360142016)>
[2024-07-23 13:02:06,428: WARNING/MainProcess] <_DummyThread(Dummy-8, started daemon 1901360137056)>
[2024-07-23 13:02:06,429: WARNING/MainProcess] <_DummyThread(Dummy-9, started daemon 1901360139776)>
So, they must have their own progress (which seems not to be like that as you can see in the last json block).
How can I obtain every task state in multiple-task case, like it was in the solo-task example?