I am trying to create a queue system for an llm helper I am developing for a job. I settled for redis and celery since I have no prior experience with any queue system and it seemed simple enough.
I tried implementing it into my app directly but while everything seemed to be working, I could never get results back with the task ids. So i made a simple app with just one function to understand what is happening and it still doesn’t work.
Here is my code:
app.py
import time
from flask import Flask, request
from celery import Celery, Task
from logging import Logger
logger = Logger("testcelery", level="INFO")
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/0"
),
)
celery = Celery(app.name)
celery.config_from_object(app.config["CELERY"])
celery.set_default()
app.extensions["celery"] = celery
@celery.task
def useless_trash(name: str) -> str:
time.sleep(10)
return f"Hello {name}"
@app.route('/', methods=["POST"])
def post_task() -> tuple:
task = useless_trash.delay(request.args.get("name"))
logger.info(task)
return {"task_id": task.id}, 200
@app.route('/<task_id>', methods=["GET"])
def get_task(task_id: str) -> tuple:
task = celery.AsyncResult(task_id)
logger.info(task)
if not task.ready():
return {"status": "Task not found"}, 404
if task.state == "PENDING":
return {"status": "Processing"}, 200
elif task.state == "SUCCESS":
return {"status": "Complete", "result": task.get()}, 200
else:
return {"status": "Error", "message": str(task.info)}, 500
@app.route("/", methods=["GET"])
def index():
return "Hello, I am working."
if __name__ == '__main__':
app.run()
I have redis running in a docker container.
I have celery in a cmd prompt running from: celery -A app.celery worker -c 5 -l DEBUG
I have celery beat running in a cmd prompt from: celery -A app.celery beat -l DEBUG
I have flask running in a cmd prompt from: flask run
None of the cmd prompt show any error messages. When I send a task through it shows up in the celery worker cmd prompt:
[2024-05-07 11:57:05,370: INFO/MainProcess] Task app.useless_trash[390da1bb-edda-48c2-a5ed-958aff37e710] received
[2024-05-07 11:57:05,370: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x000001E685F4D8A0> (args:('app.useless_trash', '390da1bb-edda-48c2-a5ed-958aff37e710', {'lang': 'py', 'task': 'app.useless_trash', 'id': '390da1bb-edda-48c2-a5ed-958aff37e710', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '390da1bb-edda-48c2-a5ed-958aff37e710', 'parent_id': None, 'argsrepr': "('Maxime',)", 'kwargsrepr': '{}', 'origin': 'gen47124@DESKTOP-SFVC8EF', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '390da1bb-edda-48c2-a5ed-958aff37e710', 'reply_to': '3ef32eb9-c48a-3414-a9f6-aaee43732d55', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '73d6523b-d7b6-49dd-9288-ae43f183b388'}, 'reply_to': '3ef32eb9-c48a-3414-a9f6-aaee43732d55', 'correlation_id': '390da1bb-edda-48c2-a5ed-958aff37e710', 'hostname': 'celery@DESKTOP-SFVC8EF', 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
If I submit multiple requests they show up in redis under the celery key as (not the same task_id as the previous one, that’s normal):
1) "{"body": "W1siTWF4aW1lIl0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "app.useless_trash", "id": "d629054f-7958-45d1-9778-9fd9425bb477", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "d629054f-7958-45d1-9778-9fd9425bb477", "parent_id": null, "argsrepr": "('Maxime',)", "kwargsrepr": "{}", "origin": "gen84044@DESKTOP-SFVC8EF", "ignore_result": false, "replaced_task_nesting": 0, "stamped_headers": null, "stamps": {}}, "properties": {"correlation_id": "d629054f-7958-45d1-9778-9fd9425bb477", "reply_to": "f523c065-e666-35dd-9bb2-85d2f17ba84e", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "763b8805-7149-4018-8e26-89b8399750e7"}}"
Nothing appears in the celery beat cmd prompt.
When I try to get info on a task with it’s task id, it always gives me a task not found message and nothing ever seems to be processed and stays exactly the same in redis.
I am unsure what steps I need to take to correct the problem, but reading the documentation and trying to get an llm to help me has led me in circles and I need some pointers to lead me out of it.
Thank you very much.