I have a chain of celery tasks as shown below. I want to run the on_failure function whenever any task in this chain fails. Currently if any intermediate task fails the main task gets stuck and keeps waiting.
I am using group task because I want to process these tasks in parallel. If there is any other better approach please let me know.
class DEBUGTaskBase(BaseTaskWithRetry):
max_retries = 1 # Since each subtask has its own retry set
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(f'*********** Failed **********')
print('############', einfo)
def on_success(self, retval, task_id, args, kwargs):
print('************* Success ***********')
@app.task(name='infer.debug.run_metadata', bind=True, base=BaseTaskWithRetry)
@onfailure_reject
def run_metadata(self, video_id):
print('running metadata task')
@app.task(name='infer.debug.check_credits', bind=True, base=BaseTaskWithRetry)
@onfailure_reject
def check_credits(self, video_id):
# raise Exception('Failed to check credits')
if True:
self.request.chain = None
raise Exception('failed to check credit')
print('running check_credits task')
@app.task(name='infer.debug.download_video', bind=True, base=BaseTaskWithRetry)
@onfailure_reject
def download_video(self, video_id):
print('running download_video task')
@app.task(name='infer.debug.extract_video', bind=True, base=BaseTaskWithRetry)
@onfailure_reject
def extract_video(self, video_id):
print('running extract_video task')
@app.task(name='infer.debug.infer_video_segment', bind=True, base=BaseTaskWithRetry)
@onfailure_reject
def infer_video_segment(self, video_id):
print('running infer_video_segment task')
@app.task(name='infer.debug.process_video', bind=True, base=BaseTaskWithRetry)
@onfailure_reject
def process_video(self):
result = chain(
group(
(infer_video_segment.si('video_id'))
for i in range(2)
)
).apply_async()
while not result.ready():
if result.failed():
print("*********** Task Chain Failed **********")
raise Exception('Task chain failed') # Raise an exception to indicate failure
time.sleep(1)
@app.task(name='infer.debug.merge_video_segments', bind=True, base=BaseTaskWithRetry)
@onfailure_reject
def merge_video_segments(self, video_id):
print('running merge_video_segments task')
@app.task(name='infer.debug.upload_video', bind=True, base=BaseTaskWithRetry)
@onfailure_reject
def upload_video(self):
print('running upload_video task')
@app.task(name='infer.debug.handle_failure', bind=True)
@onfailure_reject
def handle_failure(self, exc ):
print(f'***********************Handling failure for task exception**************: {exc}')
raise Exception('Subtask failed')
@app.task(name="infer.debug.run_inference", bind=True, base=DEBUGTaskBase)
@onfailure_reject
def infer(self, t):
# with allow_join_result():
result = chain (
run_metadata.si('video_id'),
check_credits.si('video_id'),
download_video.si('video_id'),
extract_video.si('video_id'),
process_video.si(),
merge_video_segments.s(),
group(
upload_video.si()
)
).apply_async()
# # Wait for the result to be ready
while not result.ready():
if result.failed():
print("*********** Task Chain Failed **********")
raise Exception('Task chain failed') # Raise an exception to indicate failure
time.sleep(1)
if result.failed():
print("*********** Task Chain Failed **********")
raise Exception('Task chain failed') # Raise an exception to indicate failure
print("************* Task Chain Successful ***********")
I tried everything that I could but kind of stuck now.
Ankit Pankaj is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.