We are using celery(v5.4.0) for long (CPU intensive) tasks with Redis as the broker & backend. Our setup runs on kubernetes with auto-scaling based on CPU utilization.
The problem is when the pods scale down due to low utilization, some of the tasks which were being processed inside the pod are being killed and are not being re-queued on the active pods.
To solve this issue to an extent, we have set a terminationGracePeriodSeconds
to a generous number. However, there are a few tasks which lie on the outlier in terms of processing time and they get killed.
We are also using acks_late=True
and reject_on_worker_lost=True
so that the task gets acknowledged after processing is complete and incase the pod dies, the task is re-queued.
Unfortunately this is not working the way we intended.
Here is a sample of the task setup. To simulate a pod being killed, we tried running the celery workers as 2 docker containers and killed one of them while it was executing the tasks. The tasks which got terminated due to the SIGKILL were not re-queued on the healthy container.
@celery_worker.task(
name="primitive-task",
bind=True,
ignore_result=False,
reject_on_worker_lost=True,
acks_late=True,
)
def _add_task(self, x: int, y: int, task_id: int) -> int:
sleep_time = random.randint(5, 10)
print(
f"[PID: {os.getpid()}]add task sleeping for {sleep_time} seconds for"
f" {x=} | {y=} | {task_id=}"
)
time.sleep(sleep_time)
print(f"result for {x=} | {y=} is {x+y} | {task_id=}")
return x + y