I’m encountering an issue where RabbitMQ messages are not being consumed after the successful execution of Celery tasks.
rabbitmq chart screenshot
Here’s the setup:
celery.py
:
from celery import Celery
app = Celery('testqueue',
backend='rpc://',
broker='pyamqp://127.0.0.1:5672',
include=['testqueue.tasks'])
if __name__ == '__main__':
app.start()
tasks.py
from time import sleep
from .celery import app
@app.task() # also tried: @app.task(ack=True)
def sendNotifications(arg1,arg2,arg3):
sleep(5)
return arg1 + arg3
Calling the delayed function from FastAPI backend:
async def backendFunction(arg1,arg2,arg3):
notif= sendNotifications.delay("arg1", arg2, str(arg3))
# also tried this: notif= sendNotifications.apply_async("arg1", arg2, str(arg3), queue='celery')
print(notif.task_id)
Command used to run Celery:
celery -A testqueue worker -Q celery -l INFO
In the Celery terminal, I observe the following logs:
[2024-05-23 12:18:03,281: INFO/MainProcess] Task testqueue.tasks.sendNotifications[81cb414d-d0be-4990-a2e4-70c1e4c721b3] received
[2024-05-23 12:18:08,283: INFO/ForkPoolWorker-16] Task estqueue.tasks.sendNotifications[81cb414d-d0be-4990-a2e4-70c1e4c721b3] succeeded in 5.001480647999415s: 'TEST_STRING'
The task executes successfully, but the RabbitMQ message remains unconsumed.
I understand that using get() could consume the message and release resources, as mentioned in Celery documentation. However, calling notif.get() from backendFunction(arg1,arg2,arg3) would block the code and prevent asynchronous task execution.
How can I ensure that RabbitMQ messages are consumed after successful task execution without blocking the code? Any insights would be appreciated.