Celery task not working with WebSocket (Flask)

I’m working on a project that requires both Celery and WebSocket functionality. I’m using the Flask framework along with Flask-SocketIO for the WebSocket implementation. However, I’m having trouble running both Celery and WebSocket together. I’m using Eventlet for the WebSocket, but whenever I run the app with Gunicorn and Eventlet, Celery stops accepting connections after one or two successful attempts. If I remove Eventlet, Celery works perfectly, but the WebSocket fails to connect.

below is my Dockerfile configuration for the api

FROM python:3.8-alpine3.15

ENV PYTHONUNBUFFERED=1
ENV FLASK_APP=runserver.py

# Install dependencies needed to build psycopg2
RUN apk update && apk add --no-cache 
    gcc 
    musl-dev 
    python3-dev 
    postgresql-dev 
    libc-dev 
    libffi-dev 
    openssl-dev 
    build-base 
    tzdata 
    && apk add --virtual build-deps gcc musl-dev python3-dev libffi-dev 
    && pip install --upgrade pip

WORKDIR /app
COPY requirements.txt /app
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app
EXPOSE 7000
#CMD ["gunicorn", "-b", "0.0.0.0:7000", "-w", "4", "runserver:app"]     # for (celery)
CMD ["gunicorn", "-b", "0.0.0.0:7000", "-w", "4", "-k", "eventlet", "runserver:app"] # for websocket

celery dockerfile

FROM python:3.8-alpine3.15

ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

# Install dependencies needed to build psycopg2
RUN apk update && apk add --no-cache 
    gcc 
    musl-dev 
    python3-dev 
    postgresql-dev 
    libc-dev 
    libffi-dev 
    openssl-dev 
    build-base 
    tzdata 
    && apk add --virtual build-deps gcc musl-dev python3-dev libffi-dev 
    && pip install --upgrade pip

WORKDIR /app
COPY requirements.txt /app
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app

CMD ["celery", "-A", "celery_config.utils.cel_workers.celery", "worker", "--loglevel=info", "--concurrency=4", "--autoscale=4,2", "-E", "-B"]

and below is the error i get when celery fails

teamflow-api     | Traceback (most recent call last):
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/utils/functional.py", line 30, in __call__
teamflow-api     |     return self.__value__
teamflow-api     | AttributeError: 'ChannelPromise' object has no attribute '__value__'
teamflow-api     |
teamflow-api     | During handling of the above exception, another exception occurred:
teamflow-api     |
teamflow-api     | Traceback (most recent call last):
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 446, in _reraise_as_library_errors
teamflow-api     |     yield
teamflow-nginx   | 192.168.65.1 - - [11/Sep/2024:07:32:15 +0000] "POST /api/v1/auth/resend-otp HTTP/1.1" 500 55 "-" "PostmanRuntime/7.41.2"
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 433, in _ensure_connection
teamflow-api     |     return retry_over_time(
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/utils/functional.py", line 312, in retry_over_time
teamflow-api     |     return fun(*args, **kwargs)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 877, in _connection_factory
teamflow-api     |     self._connection = self._establish_connection()
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 812, in _establish_connection
teamflow-api     |     conn = self.transport.establish_connection()
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/transport/pyamqp.py", line 201, in establish_connection
teamflow-api     |     conn.connect()
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 323, in connect
teamflow-api     |     self.transport.connect()
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/amqp/transport.py", line 129, in connect
teamflow-api     |     self._connect(self.host, self.port, self.connect_timeout)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/amqp/transport.py", line 184, in _connect
teamflow-api     |     self.sock.connect(sa)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/eventlet/greenio/base.py", line 265, in connect
teamflow-api     |     socket_checkerr(fd)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/eventlet/greenio/base.py", line 50, in socket_checkerr
teamflow-api     |     raise OSError(err, errno.errorcode[err])
teamflow-api     | ConnectionRefusedError: [Errno 111] ECONNREFUSED
teamflow-api     |
teamflow-api     | The above exception was the direct cause of the following exception:
teamflow-api     |
teamflow-api     | Traceback (most recent call last):
teamflow-api     |   File "/app/endpoints/authentication.py", line 318, in resend_otp
teamflow-api     |     send_mail.delay(payload)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 425, in delay
teamflow-api     |     return self.apply_async(args, kwargs)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 575, in apply_async
teamflow-api     |     return app.send_task(
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 788, in send_task
teamflow-api     |     amqp.send_task_message(P, name, message, **options)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/celery/app/amqp.py", line 510, in send_task_message
teamflow-api     |     ret = producer.publish(
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/messaging.py", line 177, in publish
teamflow-api     |     return _publish(
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 523, in _ensured
teamflow-api     |     return fun(*args, **kwargs)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/messaging.py", line 186, in _publish
teamflow-api     |     channel = self.channel
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/messaging.py", line 209, in _get_channel
teamflow-api     |     channel = self._channel = channel()
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/utils/functional.py", line 32, in __call__
teamflow-api     |     value = self.__value__ = self.__contract__()
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/messaging.py", line 225, in <lambda>
teamflow-api     |     channel = ChannelPromise(lambda: connection.default_channel)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 895, in default_channel
teamflow-api     |     self._ensure_connection(**conn_opts)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 433, in _ensure_connection
teamflow-api     |     return retry_over_time(
teamflow-api     |   File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
teamflow-api     |     self.gen.throw(type, value, traceback)
teamflow-api     |   File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 450, in _reraise_as_library_errors
teamflow-api     |     raise ConnectionError(str(exc)) from exc
teamflow-api     | kombu.exceptions.OperationalError: [Errno 111] ECONNREFUSED
teamflow-api     |  error@auth/resend-otp TRACEBACK
teamflow-api     | [Errno 111] ECONNREFUSED error@auth/resend-otp

my docker-compose

services:
  celery:
    build:
      context: .
      dockerfile: Dockerfile.celery
    container_name: teamflow-celery
    restart: unless-stopped
    depends_on:
      - redis
    networks:
      - teamflow_network
    volumes:
      - .:/app
    env_file:
      - .env
  api:
    build:
      context: .
      dockerfile: Dockerfile.api
    ports:
      - "7000:7000"
    volumes:
      - .:/app
    depends_on:
      - redis
      - celery
    container_name: teamflow-api
    networks:
      - teamflow_network
    env_file:
      - .env
  redis:
    image: redis:7.0-alpine
    restart: unless-stopped
    container_name: teamflow-redis
    command: --port 6380
    expose:
      - 6380
    networks:
      - teamflow_network
    ports:
      - 6380:6380

  nginx:
    image: nginx:latest
    container_name: teamflow-nginx
    ports:
      - "80:80"
    depends_on:
      - api
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    networks:
      - teamflow_network

networks:
  teamflow_network:
    external: true

the celery config

from app_config import create_app
from celery import Celery, shared_task
from flask import render_template_string
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import os
from datetime import datetime
import celery_config.schedule_config as celeryConfig
import traceback

app = create_app()


def make_celery(app=app):
    """
    As described in the doc
    """
    celery = Celery(
        app.import_name,
        backend=f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}",
        broker=f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}",
    )
    celery.conf.update(app.config)
    celery.config_from_object(celeryConfig)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery


celery = make_celery()


@shared_task
def send_mail(context):
    try:
        print("Sending Mail")
        smtp_host = 'smtp.gmail.com'
        smtp_port = 587
        smtp_user = os.environ.get('EMAIL_USER')
        smtp_password = os.environ.get('EMAIL_PASSWORD')

        server = smtplib.SMTP(smtp_host, smtp_port)
        server.starttls()
        server.login(smtp_user, smtp_password)

        from_email = "[email protected]"
        to_email = context['email']
        subject = context['subject']

        # Construct the path to the HTML file within the templates folder
        template_path = os.path.join(os.getcwd(), 'templates', context['template_name'])

        # Read the HTML file content
        with open(template_path, 'r') as html_file:
            body = html_file.read()

        # send context inside the html file
        body = render_template_string(body, **context)

        msg = MIMEMultipart()
        msg['From'] = from_email
        msg['To'] = to_email
        msg['Subject'] = subject

        # Change the MIME type to 'html'
        msg.attach(MIMEText(body, 'html'))

        server.sendmail(from_email, to_email, msg.as_string())

        server.quit()
        return "Mail sent successfully"

    except Exception as e:
        print(traceback.format_exc(), "error@celery/send_mail")
        print(e, "error@celery/send_mail")
        return "Failed to send mail"

the extensions.py

from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager
from flask_cors import CORS
from sqlalchemy import MetaData
from flask_socketio import SocketIO

naming_convention = {
    "ix": 'ix_%(column_0_label)s',
    "uq": "uq_%(table_name)s_%(column_0_name)s",
    "ck": "ck_%(table_name)s_%(column_0_name)s",
    "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
    "pk": "pk_%(table_name)s"
}

db = SQLAlchemy(metadata=MetaData(naming_convention=naming_convention))
migrate = Migrate()
jwt = JWTManager()
socketio = SocketIO(cors_allowed_origins="*", async_mode='eventlet')

cors = CORS()

the create app function

def create_app(config_name='development'):
    app = Flask(__name__)
    app.config.from_object(config_obj[config_name])
    db.init_app(app)
    migrate.init_app(app, db)
    socketio.init_app(app)
    jwt.init_app(app)
    cors.init_app(app, resources={r"/socket.io/*": {"origins": "*"}})

the runserver.py

from app_config import create_app
from dotenv import load_dotenv
from message_socket import socketio
import redis


load_dotenv()


app = create_app()

if __name__ == '__main__':
    socketio.run(app, debug=True, host='0.0.0.0', port=7000)

and the socketIo code

from flask_socketio import emit, join_room
from models import create_message, is_project_valid
from http_status import HttpStatus
from status_res import StatusRes
from extensions import socketio
from flask_jwt_extended import current_user, jwt_required


@socketio.on('connect')
def test_connect_handler():
    print('Client connected')


@socketio.on('join-room')
@jwt_required()
def on_join(data):
    project_id = data.get('project_id')
    print(current_user.id, "current_user.id")
    if not is_project_valid(project_id):
        print("Invalid project ID")
        emit('error-message', {
            'status': HttpStatus.BAD_REQUEST,
            'status_res': StatusRes.FAILED,
            'message': "Invalid project ID"
        })
        return
    join_room(project_id)


# error handler
@socketio.on_error()
def error_handler(e):
    print(e, "[email protected]_error")
    emit('error-message', {
        'status': HttpStatus.INTERNAL_SERVER_ERROR,
        'status_res': StatusRes.FAILED,
        'message': str(e)
    })


@socketio.on('send-message')
@jwt_required()
def send_message(data):
    project_id = data.get('project_id')
    print(project_id, "project_id")

    if not is_project_valid(project_id):
        print("Invalid project ID")
        emit('error-message', {
            'status': HttpStatus.BAD_REQUEST,
            'status_res': StatusRes.FAILED,
            'message': "Invalid project ID"
        })  # Send error to the requesting client only
        return

    try:
        content = data.get('content', None)
        author_id = current_user.id

        if not content:
            emit('error-message', {
                'status': HttpStatus.BAD_REQUEST,
                'status_res': StatusRes.FAILED,
                'message': "Content is required"
            })
            return

        # Assuming create_message is a function that saves the message to the database
        msg = create_message(content, author_id, project_id)
        if not msg:
            raise Exception("Network Error")

        emit('receive-message', msg.to_dict(), room=project_id)
    except Exception as e:
        # General error handling
        print(e, "error@message_socket/send-message")
        emit('error', {
            'status': HttpStatus.INTERNAL_SERVER_ERROR,
            'status_res': StatusRes.FAILED,
            'message': "Network Error"
        }, room=project_id)

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật