Please help me find out a best (better) approach to use multiprocessing with fastapi
I have a project where i have one endpoint. Where heavy ocr model(easyocr) is imported and initialized in different process.
When you are sending image to, it will send it to other process via multiprocessing.Manager().Queue()
And here are a real problem. I don’t know a good way to wait/await for a moment when other process will recognize text from this image and then re awake to send it to user
I dont want to create new process every time some one make a request, cuz it will take very long time to import easyocr module and init its model.
Here is a structure of my project:
verbumapi/
├── __init__.py
├── easyocr_api
│ ├── __init__.py
│ ├── recognition.py
│ └── router.py
├── main.py
├── multiproc_manager
│ ├── __init__.py
│ └── manager.py
Here is code:
main.py
import multiprocessing
from fastapi import FastAPI
from verbumapi.easyocr_api.router import router as router_easyocr
from verbumapi.multiproc_manager.manager import (
easyocr_img_queue,
easyocr_text_shared_dict
)
from verbumapi.easyocr_api.recognition import easyocr_process
app = FastAPI(title='VerbumAPI')
app.include_router(router_easyocr)
easyocr_process = multiprocessing.Process(
target=easyocr_process, args=(easyocr_img_queue, easyocr_text_shared_dict)
)
easyocr_process.start()
multiproc_manager/manager.py
from multiprocessing import Manager
manager = Manager()
easyocr_img_queue = manager.Queue()
easyocr_text_shared_dict = manager.dict()
easyocr_api/recognition.py
from multiprocessing.managers import DictProxy # noqa
from multiprocessing.queues import Queue
list_of_langs = ['en', 'ru']
def easyocr_process(img_queue: Queue, text_shared_dict: DictProxy):
import easyocr
model = easyocr.Reader(list_of_langs)
while True:
if img_queue.empty():
continue
image, img_hash = img_queue.get()
result = model.readtext(
image,
paragraph=True,
detail=0,
decoder='wordbeamsearch',
beamWidth=15,
)
text_shared_dict[img_hash] = result
easyocr_api/router.py
import asyncio
import hashlib
from fastapi import APIRouter, Depends, UploadFile, File
from verbumapi.easyocr_api.recognition import get_loaded_languages
from verbumapi.multiproc_manager.manager import (
easyocr_img_queue,
easyocr_text_shared_dict,
)
from verbumapi.utils import validate_and_read_img
router = APIRouter(
prefix='/easyocr',
tags=['EasyOCR'],
)
# HERE is question about this function
@router.post('/recognize')
async def recognize(image: bytes = Depends(validate_and_read_img)):
image_hash = hashlib.md5(image).hexdigest()
easyocr_img_queue.put((image, image_hash))
await asyncio.sleep(0.020)
while image_hash not in easyocr_text_shared_dict:
await asyncio.sleep(0.020)
text = easyocr_text_shared_dict.pop(image_hash)
return {
'data': text,
'detail': 'text from image was successfully recognized',
}
What is the best way to “wait” for result from other process in recognize(image: bytes = Depends(validate_and_read_img))
The way I already have
OR
This way
@router.post('/recognize')
# removed async
def recognize(image: bytes = Depends(validate_and_read_img)):
image_hash = hashlib.md5(image).hexdigest()
easyocr_img_queue.put((image, image_hash))
# removed await statements and sleep
while image_hash not in easyocr_text_shared_dict:
pass
text = easyocr_text_shared_dict.pop(image_hash)
return {
'data': text,
'detail': 'text from image was successfully recognized',
}
So it will force FastAPI to create thread for this endpoint
OR any other way?
Which way will be the fastest and more optimized?
ReYaN WTF is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.