I’m currently in the process of developing a Flask application that utilizes Celery with RabbitMQ as the broker and a PostgreSQL database as the result backend.
I have a Celery task that, in some scenarios, may take several hours to complete.
At a high level, this task is supposed to validate the contents of a file uploaded by a user. If the validation fails, the task raises an exception and stores the validation errors in the result backend; otherwise, it returns None.
For long running file validations (>30 minutes), the task indicates that the connection to the database and Celery broker have been lost:
[2024-05-28 15:56:11,486: ERROR/MainProcess] Task app.tasks.file_validation[cf7a31b0-ee65-4758-83bf-d4be1080cd3f] raised unexpected: OperationalError('(psycopg2.OperationalError) SSL connection has been closed unexpectedlyn')
Traceback (most recent call last):
.
.
Exception: {"error": "Custom Validation Errors Here..."}
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
.
.
psycopg2.OperationalError: SSL connection has been closed unexpectedly
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
.
.
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL connection has been closed unexpectedly
(Background on this error at: https://sqlalche.me/e/20/e3q8)
[2024-05-28 15:56:11,497: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
.
.
OSError: Server unexpectedly closed connection
My RabbitMQ instance (which is deployed via Kubernetes) is managed by my company’s infrastructure team, so I don’t have direct access to the configuration; however, I should be able to request configuration changes if needed. I do have access to the Management UI. With that being said, below are the version details of the tools I’m using:
RabbitMQ=3.12.13
amqp=5.2.0
celery=5.4.0
Flask=3.0.3
Flask-SQLAlchemy=3.1.1
SQLAlchemy=2.0.30
kombu=5.3.7
psycopg2=2.9.9
My Flask application has the following configurations for SQLAlchemy:
SQLALCHEMY_DATABASE_URI = "postgresql://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/project"
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = { "pool_pre_ping": True }
I have the following configurations for Celery:
CELERY = {
"broker_url": "amqps://{RMQ_USER}:{RMQ_PASS}@{RMQ_HOST}:{RMQ_PORT}/project",
"result_backend": "db+postgresql://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/project"
}
I was able to set the consumer_timeout value from the default 30 minutes in RabbitMQ to 10 hours from the Management UI for the Celery queue; however, the two additional queues that are created once a worker is started do not reflect that change. They have their consumer_timeout set to 30 minutes. I’ve requested the default consumer_timeout be changed to 10 hours. But I’m not sure this is causing the issue.
I set the SQLALCHEMY_ENGINE_OPTIONS to include pool_pre_ping in an attempt to reconnect a lost database connection; but that did not work. I would assume this issue is timeout related, but I’m not yet familiar enough with Celery/RabbitMQ/[Flask-]SQLAlchemy to know what configurations I may be missing.
any help would be greatly appreciated!
Thank You.