I’m developing an api in django, for this api I need to use celery to do delayed actions. eventually, the aim would be to remove data from the database after a certain period of inactivity. I’ve set up celery and Rabbitmq, but when I try to execute something with a print to try nothing happens.
here’s how I set up celery in settings.py:
#celery settings
import ssl
CELERY_BROKER_URL = 'amqps://guest:guest@localhost:5671/'
CELERY_BROKER_USE_SSL = {
'ca_certs': '/etc/rabbitmq/cacert.crt',
'keyfile': '/etc/rabbitmq/key.pem',
'certfile': '/etc/rabbitmq/cert.crt',
'cert_reqs': ssl.CERT_REQUIRED
}
CELERY_RESULT_BACKEND = "rpc://"
here’s the code I’m trying to run:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from django.utils import timezone
from datetime import timedelta
#DELAY_FOR_DATA_DELETION = timedelta(days=30) # 30 jours par exemple
@shared_task
def delete_user():
print("test")
return
that’s how I call the function:
delete_user.apply_async(countdown=5)
and here’s how I set up the celery.py file:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
app = Celery('base', broker="amqps://guest:guest@localhost:5671/")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
I’ve tried to do “celery status” to see the worker status but I get this error
# /home/transcendence/venv/bin/celery status
Traceback (most recent call last):
File "/home/transcendence/venv/lib/python3.11/site-packages/amqp/connection.py", line 515, in channel
return self.channels[channel_id]
~~~~~~~~~~~~~^^^^^^^^^^^^
KeyError: None
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/transcendence/venv/bin/celery", line 8, in <module>
sys.exit(main())
^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/celery/__main__.py", line 15, in main
sys.exit(_main())
^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/celery/bin/celery.py", line 236, in main
return celery(auto_envvar_prefix="CELERY")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/click/core.py", line 1157, in __call__
return self.main(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/click/core.py", line 1078, in main
rv = self.invoke(ctx)
^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/click/core.py", line 1688, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/click/core.py", line 1434, in invoke
return ctx.invoke(self.callback, **ctx.params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/click/core.py", line 783, in invoke
return __callback(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/click/decorators.py", line 33, in new_func
return f(get_current_context(), *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/celery/bin/base.py", line 135, in caller
return f(ctx, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/celery/bin/control.py", line 133, in status
callback=callback).ping()
^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/celery/app/control.py", line 294, in ping
return self._request('ping')
^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/celery/app/control.py", line 106, in _request
return self._prepare(self.app.control.broadcast(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/celery/app/control.py", line 776, in broadcast
return self.mailbox(conn)._broadcast(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/kombu/pidbox.py", line 330, in _broadcast
chan = channel or self.connection.default_channel
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/kombu/connection.py", line 956, in default_channel
self._default_channel = self.channel()
^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/kombu/connection.py", line 303, in channel
chan = self.transport.create_channel(self.connection)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 168, in create_channel
return connection.channel()
^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/amqp/connection.py", line 518, in channel
channel.open()
File "/home/transcendence/venv/lib/python3.11/site-packages/amqp/channel.py", line 448, in open
return self.send_method(
^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/amqp/abstract_channel.py", line 79, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/amqp/abstract_channel.py", line 99, in wait
self.connection.drain_events(timeout=timeout)
File "/home/transcendence/venv/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/amqp/connection.py", line 531, in blocking_read
frame = self.transport.read_frame()
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/amqp/transport.py", line 294, in read_frame
frame_header = read(7, True)
^^^^^^^^^^^^^
File "/home/transcendence/venv/lib/python3.11/site-packages/amqp/transport.py", line 629, in _read
s = recv(n - len(rbuf))
^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 104] Connection reset by peer
and I tried to look at the rabbitmq logs where you can see that there’s a connection when I do delete_user.apply_async(countdown=5)
:
2024-04-30 14:30:55.853808+00:00 [info] <0.716.0> accepting AMQP connection <0.716.0> (127.0.0.1:34092 -> 127.0.0.1:5671)
2024-04-30 14:30:55.856889+00:00 [info] <0.716.0> connection <0.716.0> (127.0.0.1:34092 -> 127.0.0.1:5671): user 'guest' authenticated and granted access to vhost '/'