We need a solution in which we can prevent the tasks that was getting executed by the celery worker from automatically requeuing into to celery after being rejected due to worker lost. We also need to save the tasks (unacknowledged tasks) to be stored in a variable so that we can execute it later.
Acknowledged task – It means task is executed completely and acknowledged by the celery worker.
Unacknowledged task- It means that the task is not executed completely and get rejected due to some exception . like worker lost exception.
`import time
from celery import Celery
from flask import Flask, jsonify, request
app = Flask(__name__)
# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://:dbrun_redis_dev@localhost:6334/0'
app.config['result_backend'] = 'redis://:dbrun_redis_dev@localhost:6334/0'
# Create Celery instance
celery = Celery(
app.name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['result_backend'],
task_reject_on_worker_lost=True,
task_acks_late=True,
)
# Configure Celery worker prefetch multiplier
celery.conf.worker_prefetch_multiplier = 1
# Disable rate limits for Celery worker
celery.conf.worker_disable_rate_limits = True
# Disable Celery's signal handling in favor of Flask'
@celery.task(bind=True)
def queued_action(self, **kwargs):
print('Try {0}/{1} of task ---> {2}'.format(self.request.retries, self.max_retries, str(kwargs.get('id'))))
print("Process started " + str(kwargs.get('id')))
print(kwargs)
print("TASK ID = " + str(kwargs.get('id')))
time.sleep(60)
print("Process completed" + str(kwargs.get('id')))
return "Testing " + str(kwargs.get('id'))
# Flask route to trigger the Celery task
@app.route('/executor/executions', methods=['POST'])
def executions():
print(request.json)
result = queued_action.apply_async(kwargs=(request.json))
return jsonify(
{"task_id": result.id, "status": "Action Queued Successfully"}
)
# Flask route to check the result of the Celery task
@app.route('/result/<task_id>')
def get_celery_result(task_id):
result = queued_action.AsyncResult(task_id)
if result.ready():
return jsonify({"status": "Task completed", "result": result.result})
else:
return jsonify({"status": "Task is still in progress"})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)`
I have tried this approach and I want that the task is not automatically requeued to the celery when rejected due to worker lost.
Sahil Singh is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.