I have been scratching my head about this for days now. I have a a background running task, which takes around 1 hour. I am processing 5000 images. I create a group of celery tasks and each task processes 1000 images each.
I have an on_success
method where I create a ‘Finished’ database record, and remove the ‘inProgress’ record.
The problem however is, that even while having tons of checks in the on_success method, it still somehow generates 2 ‘Finished’ database records. It sounds confusing, see the code below:
def on_success(self, retval, task_id, *args, **kwargs):
self.image_job.refresh_from_db()
total_nr_of_images = self.image_job.total_nr_of_images
stored_results = ImageJobResults.objects.filter(job=self.image_job).count()
if stored_results == total_nr_of_images:
#It means all the workers have completed processing images
# for this job, and we need to create a csv file, and a Finished BC Job
with transaction.atomic(using='my_db'):
image_job = ImageJob.objects.select_for_update().get(id=self.image_job.id)
if not ImageJobFinished.objects.filter(job=image_job).exists():
job_in_progress = ImageJobInProgress.objects.get(job=image_job)
started_at = job_in_progress.started_at
image_job.generate_results(started_at=started_at,completed_at=now(),csv_rds_handle_job_id=self.image_job.pk)
#Now that a ImageJobFinished is created.
# We delete the ImageJobInProgress instance
job_in_progress.delete()
return super().on_success(retval, task_id, *args, **kwargs)
Now theoretically, the stored_results
should be equal to total_nr_of_images
only in the last worker. But both the workers somehow finish on the same time, and they get the same values here. Both of them then create the ImageJobFinished object, even though I check for existence of another object, it passes that check that. I just don’t understand how this can be. It HAS to do with how the DB is accessed by both process i think.
I checked into celery chord, but that only helps inside a single view for small tasks i think.
To make Celery execute on_success only for the last worker process in a group, use code to group tasks and specify a callback task: When all tasks in the group are completed, the callback task will be executed with on_success, and only after the last task.
Spammer is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.