I’m working on a project that requires both Celery and WebSocket functionality. I’m using the Flask framework along with Flask-SocketIO for the WebSocket implementation. However, I’m having trouble running both Celery and WebSocket together. I’m using Eventlet for the WebSocket, but whenever I run the app with Gunicorn and Eventlet, Celery stops accepting connections after one or two successful attempts. If I remove Eventlet, Celery works perfectly, but the WebSocket fails to connect.
below is my Dockerfile configuration for the api
FROM python:3.8-alpine3.15
ENV PYTHONUNBUFFERED=1
ENV FLASK_APP=runserver.py
# Install dependencies needed to build psycopg2
RUN apk update && apk add --no-cache
gcc
musl-dev
python3-dev
postgresql-dev
libc-dev
libffi-dev
openssl-dev
build-base
tzdata
&& apk add --virtual build-deps gcc musl-dev python3-dev libffi-dev
&& pip install --upgrade pip
WORKDIR /app
COPY requirements.txt /app
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app
EXPOSE 7000
#CMD ["gunicorn", "-b", "0.0.0.0:7000", "-w", "4", "runserver:app"] # for (celery)
CMD ["gunicorn", "-b", "0.0.0.0:7000", "-w", "4", "-k", "eventlet", "runserver:app"] # for websocket
celery dockerfile
FROM python:3.8-alpine3.15
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
# Install dependencies needed to build psycopg2
RUN apk update && apk add --no-cache
gcc
musl-dev
python3-dev
postgresql-dev
libc-dev
libffi-dev
openssl-dev
build-base
tzdata
&& apk add --virtual build-deps gcc musl-dev python3-dev libffi-dev
&& pip install --upgrade pip
WORKDIR /app
COPY requirements.txt /app
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app
CMD ["celery", "-A", "celery_config.utils.cel_workers.celery", "worker", "--loglevel=info", "--concurrency=4", "--autoscale=4,2", "-E", "-B"]
and below is the error i get when celery fails
teamflow-api | Traceback (most recent call last):
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/utils/functional.py", line 30, in __call__
teamflow-api | return self.__value__
teamflow-api | AttributeError: 'ChannelPromise' object has no attribute '__value__'
teamflow-api |
teamflow-api | During handling of the above exception, another exception occurred:
teamflow-api |
teamflow-api | Traceback (most recent call last):
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 446, in _reraise_as_library_errors
teamflow-api | yield
teamflow-nginx | 192.168.65.1 - - [11/Sep/2024:07:32:15 +0000] "POST /api/v1/auth/resend-otp HTTP/1.1" 500 55 "-" "PostmanRuntime/7.41.2"
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 433, in _ensure_connection
teamflow-api | return retry_over_time(
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/utils/functional.py", line 312, in retry_over_time
teamflow-api | return fun(*args, **kwargs)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 877, in _connection_factory
teamflow-api | self._connection = self._establish_connection()
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 812, in _establish_connection
teamflow-api | conn = self.transport.establish_connection()
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/transport/pyamqp.py", line 201, in establish_connection
teamflow-api | conn.connect()
teamflow-api | File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 323, in connect
teamflow-api | self.transport.connect()
teamflow-api | File "/usr/local/lib/python3.8/site-packages/amqp/transport.py", line 129, in connect
teamflow-api | self._connect(self.host, self.port, self.connect_timeout)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/amqp/transport.py", line 184, in _connect
teamflow-api | self.sock.connect(sa)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/eventlet/greenio/base.py", line 265, in connect
teamflow-api | socket_checkerr(fd)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/eventlet/greenio/base.py", line 50, in socket_checkerr
teamflow-api | raise OSError(err, errno.errorcode[err])
teamflow-api | ConnectionRefusedError: [Errno 111] ECONNREFUSED
teamflow-api |
teamflow-api | The above exception was the direct cause of the following exception:
teamflow-api |
teamflow-api | Traceback (most recent call last):
teamflow-api | File "/app/endpoints/authentication.py", line 318, in resend_otp
teamflow-api | send_mail.delay(payload)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 425, in delay
teamflow-api | return self.apply_async(args, kwargs)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 575, in apply_async
teamflow-api | return app.send_task(
teamflow-api | File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 788, in send_task
teamflow-api | amqp.send_task_message(P, name, message, **options)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/celery/app/amqp.py", line 510, in send_task_message
teamflow-api | ret = producer.publish(
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/messaging.py", line 177, in publish
teamflow-api | return _publish(
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 523, in _ensured
teamflow-api | return fun(*args, **kwargs)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/messaging.py", line 186, in _publish
teamflow-api | channel = self.channel
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/messaging.py", line 209, in _get_channel
teamflow-api | channel = self._channel = channel()
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/utils/functional.py", line 32, in __call__
teamflow-api | value = self.__value__ = self.__contract__()
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/messaging.py", line 225, in <lambda>
teamflow-api | channel = ChannelPromise(lambda: connection.default_channel)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 895, in default_channel
teamflow-api | self._ensure_connection(**conn_opts)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 433, in _ensure_connection
teamflow-api | return retry_over_time(
teamflow-api | File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
teamflow-api | self.gen.throw(type, value, traceback)
teamflow-api | File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 450, in _reraise_as_library_errors
teamflow-api | raise ConnectionError(str(exc)) from exc
teamflow-api | kombu.exceptions.OperationalError: [Errno 111] ECONNREFUSED
teamflow-api | error@auth/resend-otp TRACEBACK
teamflow-api | [Errno 111] ECONNREFUSED error@auth/resend-otp
my docker-compose
services:
celery:
build:
context: .
dockerfile: Dockerfile.celery
container_name: teamflow-celery
restart: unless-stopped
depends_on:
- redis
networks:
- teamflow_network
volumes:
- .:/app
env_file:
- .env
api:
build:
context: .
dockerfile: Dockerfile.api
ports:
- "7000:7000"
volumes:
- .:/app
depends_on:
- redis
- celery
container_name: teamflow-api
networks:
- teamflow_network
env_file:
- .env
redis:
image: redis:7.0-alpine
restart: unless-stopped
container_name: teamflow-redis
command: --port 6380
expose:
- 6380
networks:
- teamflow_network
ports:
- 6380:6380
nginx:
image: nginx:latest
container_name: teamflow-nginx
ports:
- "80:80"
depends_on:
- api
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
networks:
- teamflow_network
networks:
teamflow_network:
external: true
the celery config
from app_config import create_app
from celery import Celery, shared_task
from flask import render_template_string
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import os
from datetime import datetime
import celery_config.schedule_config as celeryConfig
import traceback
app = create_app()
def make_celery(app=app):
"""
As described in the doc
"""
celery = Celery(
app.import_name,
backend=f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}",
broker=f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}",
)
celery.conf.update(app.config)
celery.config_from_object(celeryConfig)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery()
@shared_task
def send_mail(context):
try:
print("Sending Mail")
smtp_host = 'smtp.gmail.com'
smtp_port = 587
smtp_user = os.environ.get('EMAIL_USER')
smtp_password = os.environ.get('EMAIL_PASSWORD')
server = smtplib.SMTP(smtp_host, smtp_port)
server.starttls()
server.login(smtp_user, smtp_password)
from_email = "[email protected]"
to_email = context['email']
subject = context['subject']
# Construct the path to the HTML file within the templates folder
template_path = os.path.join(os.getcwd(), 'templates', context['template_name'])
# Read the HTML file content
with open(template_path, 'r') as html_file:
body = html_file.read()
# send context inside the html file
body = render_template_string(body, **context)
msg = MIMEMultipart()
msg['From'] = from_email
msg['To'] = to_email
msg['Subject'] = subject
# Change the MIME type to 'html'
msg.attach(MIMEText(body, 'html'))
server.sendmail(from_email, to_email, msg.as_string())
server.quit()
return "Mail sent successfully"
except Exception as e:
print(traceback.format_exc(), "error@celery/send_mail")
print(e, "error@celery/send_mail")
return "Failed to send mail"
the extensions.py
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager
from flask_cors import CORS
from sqlalchemy import MetaData
from flask_socketio import SocketIO
naming_convention = {
"ix": 'ix_%(column_0_label)s',
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(column_0_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
db = SQLAlchemy(metadata=MetaData(naming_convention=naming_convention))
migrate = Migrate()
jwt = JWTManager()
socketio = SocketIO(cors_allowed_origins="*", async_mode='eventlet')
cors = CORS()
the create app function
def create_app(config_name='development'):
app = Flask(__name__)
app.config.from_object(config_obj[config_name])
db.init_app(app)
migrate.init_app(app, db)
socketio.init_app(app)
jwt.init_app(app)
cors.init_app(app, resources={r"/socket.io/*": {"origins": "*"}})
the runserver.py
from app_config import create_app
from dotenv import load_dotenv
from message_socket import socketio
import redis
load_dotenv()
app = create_app()
if __name__ == '__main__':
socketio.run(app, debug=True, host='0.0.0.0', port=7000)
and the socketIo code
from flask_socketio import emit, join_room
from models import create_message, is_project_valid
from http_status import HttpStatus
from status_res import StatusRes
from extensions import socketio
from flask_jwt_extended import current_user, jwt_required
@socketio.on('connect')
def test_connect_handler():
print('Client connected')
@socketio.on('join-room')
@jwt_required()
def on_join(data):
project_id = data.get('project_id')
print(current_user.id, "current_user.id")
if not is_project_valid(project_id):
print("Invalid project ID")
emit('error-message', {
'status': HttpStatus.BAD_REQUEST,
'status_res': StatusRes.FAILED,
'message': "Invalid project ID"
})
return
join_room(project_id)
# error handler
@socketio.on_error()
def error_handler(e):
print(e, "[email protected]_error")
emit('error-message', {
'status': HttpStatus.INTERNAL_SERVER_ERROR,
'status_res': StatusRes.FAILED,
'message': str(e)
})
@socketio.on('send-message')
@jwt_required()
def send_message(data):
project_id = data.get('project_id')
print(project_id, "project_id")
if not is_project_valid(project_id):
print("Invalid project ID")
emit('error-message', {
'status': HttpStatus.BAD_REQUEST,
'status_res': StatusRes.FAILED,
'message': "Invalid project ID"
}) # Send error to the requesting client only
return
try:
content = data.get('content', None)
author_id = current_user.id
if not content:
emit('error-message', {
'status': HttpStatus.BAD_REQUEST,
'status_res': StatusRes.FAILED,
'message': "Content is required"
})
return
# Assuming create_message is a function that saves the message to the database
msg = create_message(content, author_id, project_id)
if not msg:
raise Exception("Network Error")
emit('receive-message', msg.to_dict(), room=project_id)
except Exception as e:
# General error handling
print(e, "error@message_socket/send-message")
emit('error', {
'status': HttpStatus.INTERNAL_SERVER_ERROR,
'status_res': StatusRes.FAILED,
'message': "Network Error"
}, room=project_id)