I developed some wrappers for SQLAlchemy session for using in repositories in FastAPI application to ensure that there is only one single session per request. The final goal for this is to incapsulate working with session in autocommit mode (like for example in Django). The whole code is in my GitHub repository.
Work with database incapsulated in Database class:
import functools
from contextvars import ContextVar
from functools import wraps
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine, AsyncTransaction, AsyncConnection
session_context = ContextVar("session")
class SessionNotInitializedError(Exception):
def __init__(self):
super().__init__("Init session with init_session method")
class Session:
def __init__(self, bind: AsyncEngine | AsyncConnection):
self.bind = bind
self.token = None
async def __aenter__(self):
self.session = AsyncSession(bind=self.bind, expire_on_commit=False)
self.token = session_context.set(self.session)
return self
async def __aexit__(self, exc_type=None, exc_value=None, traceback=None):
await self.session.close()
session_context.reset(self.token)
def __call__(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with self:
return await func(*args, **kwargs)
return wrapper
class Transaction:
def __init__(self, engine: AsyncEngine):
self.engine = engine
self.session = None
async def __aenter__(self):
connection = await self.engine.connect()
self.transaction = await connection.begin()
self.session = Session(bind=connection)
await self.session.__aenter__()
return self
async def __aexit__(self, exc_type=None, exc_value=None, traceback=None) -> None:
if exc_type is None:
await self.commit()
else:
await self.rollback()
await self.session.__aexit__()
def __call__(self, func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
async with self:
return await func(*args, **kwargs)
return wrapper # type: ignore
async def commit(self):
await self.transaction.commit()
async def rollback(self):
await self.transaction.rollback()
class Database:
def __init__(self, url):
self.engine = create_async_engine(url, echo=True)
def init_session(self):
return Session(self.engine.execution_options(isolation_level="AUTOCOMMIT"))
@property
def __session(self) -> AsyncSession:
try:
return session_context.get()
except LookupError:
raise SessionNotInitializedError
def transaction(self):
return Transaction(self.engine)
async def scalar(self, statement):
return await self.__session.scalar(statement)
async def scalars(self, statement):
return await self.__session.scalars(statement)
async def execute(self, statement):
return await self.__session.execute(statement)
async def get_by_pk(self, entity, ident, **kwargs):
return await self.__session.get(entity, ident, **kwargs)
Create some instance of Database
:
default_db = Database("postgresql+asyncpg://postgres:postgres@localhost:5433/postgres")
Examples of use:
app = FastAPI()
@router.get("/user/{user_id}", response_model=UserSchema)
@default_db.init_session()
async def get_user(user_id: int, repo: UsersRepository = Depends()):
return await repo.get_by_pk(user_id)
As you could see there is @default_db.init_session()
call. This is explicit way.
But how could I init session implicitly? Well I guess it is not to much difficult to init session – session could be initialized at the first query method call. But how could I close it also implicitly? I mean how to catch finish of API view? In Session wrapper it is done in aexit method.
Maybe some way besides middleware?
The case above is case in web framework context. There is also another case for example somewhere else like in faststream tasks or in background tasks. F.e.
tasks.py
@default_db.init_session()
async def update_users_backgound_task():
repo = UsersRepository()
# doing something...
If I understood your question correctly, I suggest you review the following fastapi document.
https://fastapi.tiangolo.com/tutorial/middleware/#before-and-after-the-response
Semih Çağdavul is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
2