I’ve decided to try to make a custom scheduler using Celery/Celery Beat to perform some simple queued tasks. This is my first time working with celery, so it’s possible part of my config could be wrong. But I think I simply need to somehow pass in my flask app to get past the error “RuntimeError: Working outside of application context.” that I get in my custom scheduler on start when it tries to perform a query with SQLAlchemy. I’m just having issues getting it to be used correctly in my scheduler so I can use ‘with app.app_context():’ while avoiding any circular dependencies. I’ve tried a few different approaches but I just cant seem to get it working appropir
Here is my celery_config file where I set up celery & where I believe I’ll need to somehow pass in my flask app to my custom scheduler.
from celery import Celery
from flask import Flask
from SchedulerModule.CustomScheduler import CustomScheduler
def make_celery(app: Flask) -> Celery:
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERY_BEAT_SCHEDULER'] = 'SchedulerModule.CustomScheduler.CustomScheduler'
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
celery.conf.beat_scheduler = CustomScheduler(app=app)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
flask_app = Flask(__name__)
celery_app = make_celery(flask_app)
Here is my application.py where I fire up the main Flask Application
from flask import Flask, redirect, url_for
from extensions import ma, jwt, db, socketio
import config.const as CONSTANTS
from web.authentication import auth
from web.tasks import tasks
from flask_cors import CORS
from celery_config import celery_app
def create_app():
print("create_app. Preparing to setup Flask")
application = Flask(__name__)
CORS(application)
configure_app(application)
register_extensions(application)
configure_oauth(application)
celery_app.conf.update(application.config)
return application
def register_extensions(application):
print("Registering extensions")
socketio.init_app(application, cors_allowed_origins="*")
db.init_app(application)
ma.init_app(application)
jwt.init_app(application)
def configure_app(application):
application.config['SQLALCHEMY_DATABASE_URI'] = CONSTANTS.DB_CONNECTION_STRING
application.config['SECRET_KEY'] = CONSTANTS.SECRET
application.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
application.config['PREFERRED_URL_SCHEME'] = 'https'
def register_blueprints(application):
print("Registering blueprints")
application.register_blueprint(auth, url_prefix='/auth')
application.register_blueprint(tasks,url_prefix='/task')
print("Preparing to call create_app")
application = create_app()
register_blueprints(application)
# Default route
@application.route("/")
def default_route():
return redirect(url_for('auth.login'))
# Start the flask server
if __name__ == '__main__':
socketio.run(application, debug=True, use_reloader=False, port=5009, allow_unsafe_werkzeug=True)
And lastly my Custom Scheduler module I’m using with celery. This is where the context error gets thrown when it initially tries the TaskSchedule.query.all().
from celery.beat import Scheduler, ScheduleEntry
from models.TaskSchedule import TaskSchedule
from celery.schedules import crontab, timedelta
from services.helpers import get_cron_day_of_week
class CustomScheduler(Scheduler):
def sync(self):
self.sync_from_db()
def setup_schedule(self):
self.sync_from_db()
def sync_from_db(self):
schedules = TaskSchedule.query.all()
self.merge_from_db(schedules)
def merge_from_db(self, schedules):
all_schedule_entries = {}
for schedule in schedules:
all_schedule_entries[schedule.name] = ScheduleEntry(
name=schedule.name,
task=schedule.task_name,
schedule=self.deserialize_schedule(schedule),
args=schedule.args,
kwargs=schedule.kwargs,
options=schedule.options if schedule.options else {},
last_run_at=schedule.last_run_at,
total_run_count=schedule.total_run_count,
)
self.schedule.update(all_schedule_entries)
def deserialize_schedule(self, schedule):
if schedule.interval:
return timedelta(minutes=int(schedule.minute))
else:
day_of_week = get_cron_day_of_week(schedule.day_of_week)
return crontab(
minute=schedule.minute,
hour=schedule.hour,
day_of_week=day_of_week
)
def update_task(self, task_id):
task = TaskSchedule.query.get(task_id)
if task:
self.remove_task(task.name)
self.add_task(task)
def remove_task(self, task_name):
if task_name in self.schedule:
del self.schedule[task_name]
self.sync()
def add_task(self, schedule):
if schedule.name in self.schedule:
print(f"Task {schedule.name} is already in the scheduler.")
return
entry = ScheduleEntry(
name=schedule.name,
task=schedule.task_name,
schedule=self.deserialize_schedule(schedule),
args=schedule.args or [],
kwargs=schedule.kwargs or {},
options=schedule.options or {},
last_run_at=schedule.last_run_at,
total_run_count=schedule.total_run_count or 0,
)
self.schedule[schedule.name] = entry
self.sync()
print(f"Task {schedule.name} added to the scheduler.")
Any help is greatly appreciated. Thank you!