I have built my app on Flask. I connected Celery to work with queue. I used Redis as broker for Celery. I am going to create more than 3 million jobs and each job takes about 2 seconds to 20 seconds, so it will run for a long time. I want to monitor worker’s status, how many tasks completed and how many tasks are left. Here my related project codes:
/app/init.py
from flask import Flask
from app.extensions import db, migrate
from app.config import Config
import click
from flask.cli import with_appcontext
from app.services.jobs.html_indexing import html_indexing
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024
db.init_app(app)
migrate.init_app(app, db)
with app.app_context():
from app.routes.auth import auth_bp
from app.routes.upload import upload_bp
from app.routes.search import search_bp
from app.routes.indexing import indexing_bp
from app.routes.test_bp import test_bp
app.register_blueprint(auth_bp, url_prefix='/auth')
app.register_blueprint(upload_bp, url_prefix='/upload')
app.register_blueprint(search_bp, url_prefix='/search')
app.register_blueprint(indexing_bp, url_prefix='/indexing')
app.register_blueprint(test_bp, url_prefix='/test')
# Ensure models are imported
from app.models import User, Word, Instance, Params, InstanceParamsLink, InstanceWordLink
# add command function to cli commands
app.cli.add_command(paragraph_indexing)
return app
@click.command(name='paragraph_indexing')
@with_appcontext
@click.argument('limit')
def paragraph_indexing(limit):
html_indexing.delay(limit)
/app/services/jobs/html_indexing.py
import re
import psycopg2
from bs4 import BeautifulSoup
from psycopg2 import sql
from app.extensions import db
from app.models import Instance, Params, InstanceParamsLink, Word, InstanceWordLink
from app.services.jobs.celery_app import app
from dotenv import load_dotenv
load_dotenv()
@app.task
def html_indexing(limit, user_id=1):
from app import create_app
app = create_app()
with app.app_context():
db_params = {
'dbname': 'dbname',
'user': 'db_user',
'password': 'db_password',
'host': 'db_host',
}
try:
conn = psycopg2.connect(**db_params)
print('Connected to the database')
cursor = conn.cursor()
query = sql.SQL("SELECT p.id, p.content, p.document_id "
"FROM discussing.paragraph p "
"LEFT JOIN utility.text_index ti ON ti.file_type_id = 1 AND ti.row_id = p.id "
"WHERE p.docx = false and ti.id is null "
"ORDER BY p.id "
"LIMIT %s "
)
cursor.execute(query, (limit,))
rows = cursor.fetchall()
for row in rows:
# create instance
instance = Instance(project_id=user_id)
db.session.add(instance)
db.session.commit()
# create Params
document_id_param = create_params('document_id')
paragraph_id_param = create_params('paragraph_id')
# create InstanceParamsLink
create_instance_params_link(document_id_param.id, row[2], instance.id)
create_instance_params_link(paragraph_id_param.id, row[0], instance.id)
# create words
get_words_set = get_text(row[1])
# create InstanceWordLink
create_instance_word_links(get_words_set, instance.id)
# update text_index in resource server
try:
# update text_index in resource server
update_query = sql.SQL("INSERT INTO utility.text_index (file_type_id, row_id) VALUES (1, %s)")
cursor.execute(update_query, (row[0],))
conn.commit()
print(f"Paragraph {row[0]} indexed")
except psycopg2.Error as e:
conn.rollback()
print(f"Failed to insert into text_index for paragraph {row[0]}: {e}")
# Close the cursor and the connection
cursor.close()
conn.close()
except psycopg2.Error as e:
print(f"Error: {e}")
def create_params(key):
params = db.session.execute(db.select(Params).filter_by(name=key)).scalar_one_or_none()
if not params:
params = Params(name=key)
db.session.add(params)
db.session.commit()
params = db.session.execute(db.select(Params).filter_by(name=key)).scalar_one()
return params
def create_instance_params_link(param_id, value, instance_id):
instance_params_link = InstanceParamsLink(
param_id=param_id,
value=value,
instance_id=instance_id
)
db.session.add(instance_params_link)
db.session.commit()
def get_text(html_content):
html_content = html_content.replace(' ', ' ')
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script_or_style in soup(['script', 'style']):
script_or_style.decompose()
# Get text
text = soup.get_text(separator=' ')
text = text.lower()
words = set()
text = re.sub('[^0-9a-zA-Zu0400-u04FF -]+', '', text)
text = text.replace("-", " ")
text = text.split(" ")
words.update(text)
return words
def create_instance_word_links(words, instance_id):
for word in words:
if len(word) > 50:
continue
word_obj = db.session.query(Word).filter_by(word=word).first()
if not word_obj:
word_obj = Word(word=word)
db.session.add(word_obj)
db.session.commit()
instance_word = InstanceWordLink(word_id=word_obj.id, instance_id=instance_id)
db.session.merge(instance_word)
db.session.commit()
app/services/jobs/celery_app.py
# celery_app.py
from celery import Celery
import os
app = Celery('text_search', broker=os.getenv('CELERY_BROKER_URL'))
I run worker using this command:
celery -A app.services.jobs.celery_app worker –pool=solo –loglevel=Info -n worker_main
I push tasks to worker using below command:
flask paragraph_indexing 200
Below the method which checks worker’s current status:
/app/services/jobs/check_celery_tasks.py
# check_celery_tasks.py
from celery_app import app
# Get the inspector
inspector = app.control.inspect()
# Fetch task information
reserved = inspector.reserved()
scheduled = inspector.scheduled()
active = inspector.active()
stats = inspector.stats()
def count_tasks(tasks_dict):
count = 0
if tasks_dict:
for worker, tasks in tasks_dict.items():
count += len(tasks)
return count
# Count tasks
reserved_count = count_tasks(reserved)
scheduled_count = count_tasks(scheduled)
active_count = count_tasks(active)
# Fetch and sum up completed tasks
total_completed = 0
if stats:
for worker, stat in stats.items():
total_completed += sum(stat.get('total', {}).values())
print(f"Reserved tasks: {reserved_count}")
print(f"Scheduled tasks: {scheduled_count}")
print(f"Active tasks: {active_count}")
print(f"Completed tasks: {total_completed}")
I check using below command:
python app/services/jobs/check_celery_tasks.py
While worker doing jobs, i run above command to check status of the worker, and output always is:
Reserved tasks: 0
Scheduled tasks: 0
Active tasks: 0
Completed tasks: 0
Problem is, I need something to get monitor’s remained tasks and finished tasks. I have tried flower, but i need to get list using CLI command.