I have a problem! When performing several tasks in a loop in celery, sometimes the next task begins to run without waiting for its previous call to be completed.
I need a strictly consistent sequence of tasks with the same argument!
Here are the statistics of one of them falling out due to the fact that it is hidden from prying eyes (flower):
Name | UUID | State | args | Recived | Started | Runtime | Worker |
---|---|---|---|---|---|---|---|
app.celery_tasks.sync_put_to_samesubscriber | 25354a3b-65cc-408c-85c0-99981b1e4d7c | SUCCESS | [5] | 2024-05-29 11:10:41.153 | 2024-05-29 11:10:45.075 | 2.63 | celery@celery |
app.celery_tasks.sync_put_to_samesubscriber | de274f70-7c6c-4ba8-9c7a-1bb51549af97 | SUCCESS | [5] | 2024-05-29 11:10:30.593 | 2024-05-29 11:10:42.522 | 3.60 | celery@celery |
Execution code:
from celery import Celery
import time
beat_schedule.update({
'sync_put_to_subscribers': {
'task': 'app.celery_tasks.sync_put_to_subscribers',
'schedule': 20.0,
}})
# Sending to a specific subscriber by id.
@celery.task
def sync_put_to_samesubscriber(id: int):
time.sleep(20)
res = '{"test": "ok"}'
return json.loads(res)
@celery.task(default_retry_delay=3)
def sync_put_to_subscribers():
res = []
cur_task = {}
try:
# Run a loop on them and execute the functions as .delay
for wssubscriber_id in [5, 16]:
# We get a list of active tasks...
active_task = celery.control.inspect().active().get(f'''celery@{os.environ.get('CELERY_HOST')}''', [])
# The cycle of active tasks...
task_is_active = False
for task in active_task:
# If we find an active task with the same name and arguments, then we interrupt the loop and set a sign that the task is still running...
if task.get('name', '') == 'app.celery_tasks.sync_put_to_samesubscriber' and task.get('args', []) == [wssubscriber_id]:
cur_task = {wssubscriber_id: task.get('id', ''), 'state': 'Still started'}
task_is_active = True
break
# If the task is not active, then start the instance...
if not task_is_active:
celery_task = sync_put_to_samesubscriber.apply_async(args=[wssubscriber_id])
cur_task = {wssubscriber_id: celery_task.id, 'state': 'Applyed async'}
res.append(cur_task)
except Exception as e:
pass
return res
celery.conf.beat_schedule = beat_schedule
I tried to find in the celery documentation a way to set the sequential execution of tasks in the context of the procedure name + argument, and not just by name, but I did not find it.
ronvaleron is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.