I have a FastAPI application that is super simple: it’s wrote using FastAPI, asyncio, aiohttp and boto3 to interact over sagemaker.
The main idea is very simple: it exposes a GET endpoint that execute like 5 or 6 HTTP requests and after it sagemaker is called.
Running a few tests, I started to notice a strage behaviour: It was possible to see that async http requests were taking a lot to finishes and looking on metrics from the other application, it was clear that the problem wasn’t the another app.
So I have commented the related code to boto3/sagemaker and run the same test: all requests were super smooth when I drop sagemaker call.
Now it’s clear that I have some problem in my code design but I didn’t figure out what is wrong – the only idea that is my mind it’s something related to mix async and sync (request A arrives, processed async code, call sagemaker, request B arrives, start process async and got stuck because request A it’s waiting sagemaker/boto3 response). Does it make sense?
What’s the proper way to write code to execute async http requests and use sync lib to execute another actions?
Here is what I have wrote:
from fastapi import FastAPI
import ujson
import boto3
import aiohttp
import asyncio
server = FastAPI()
async def get_info(url: str, infos: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
async_calls = list()
for feature in features:
async_calls.append(Aiohttp.query_url(url, feature))
infos = await asyncio.gather(*async_calls)
return list(infos)
def extract_insights(endpoint, payload):
payload = transform_payload(payload) #simple json transformation
response = client.invoke_endpoint(
EndpointName=endpoint,
Body=payload,
ContentType='application/json'
)
response_body = response["Body"].read().decode("utf-8")
response_body = ujson.loads(response_body)
return response_body
@server.post('/insights')
async def insights(payload: Payload):
transaction_id = transaction['id_trns']
general_model = 'mkt-model'
api_url = os.getenv('API_URL')
infos = get_features(payload) # here is a simple json read that create a list of string
extra_info = await get_info(api_url, infos)
payload.update(extra_info)
insights = extract_insights(general_model, payload)
return JSONResponse(content={
'response': 200,
'insight': insights
}, status_code=200)
class Aiohttp:
@classmethod
def get_aiohttp_client(cls) -> aiohttp.ClientSession:
if cls.aiohttp_client is None:
connector = aiohttp.TCPConnector(limit=0)
cls.aiohttp_client = aiohttp.ClientSession(connector=connector)
return cls.aiohttp_client
@classmethod
async def close_aiohttp_client(cls) -> None:
if cls.aiohttp_client:
await cls.aiohttp_client.close()
cls.aiohttp_client = None
@classmethod
async def query_url(cls, url: str, feature: Dict[str, Any]) -> Any:
client = cls.get_aiohttp_client()
payload = _create_feature_payload(feature)
try:
async with client.post(url=url, json=payload, headers=headers) as response:
feature_response = await response.json()
return feature_response
except Exception as e:
logger.error(f"An error occurred: {e}")
return {}
Any idea why extract_insights
is making my code so slow? It’s really smooth when I drop this line.
I’m kinda stuck about how to refactoring to make it work.
dpcllalala is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.