I have patch method for update
it’s looks like
@router.patch("/{id}", response_model=ResponseBlockSchema, status_code=status.HTTP_200_OK)
async def update_block_router(
update_data: UpdateBlockSchema,
block_id: str = Query(..., alias="id"),
block_service: BlockService = Depends(get_block_service),
):
block = await block_service.update_block(
block_id=uuid.UUID(block_id),
update_data=update_data.dict(),
)
if not block:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Block not found",
)
return block
here are pydantic schemes for requests and responses
class BaseBlockSchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
type: Optional[str]
props: dict
order: int
class ChildBlockSchema(BaseBlockSchema):
pass
class ParentBlockSchema(BaseBlockSchema):
pageId: Optional[uuid.UUID]
parentId: Optional[uuid.UUID]
class CreateBlockSchema(ParentBlockSchema):
children: list[ChildBlockSchema]
class UpdateBlockSchema(ParentBlockSchema):
pass
class ResponseBlockSchema(ParentBlockSchema):
id: uuid.UUID
class ResponseBlockSchemaAfterCreate(ResponseBlockSchema):
children: list[ChildBlockSchema]
don’t pay attention to the update scheme, as it doesn’t fit into the concept of the patch method, this is fixable ))
you can also notice that I pull up the block_service dependency
async def get_block_service(session: AsyncSession = Depends(get_async_session)) -> BlockService:
return BlockService(
BlockRepository(session=session),
)
here is another part of the config file
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
DATABASE_URL = f"postgresql+asyncpg://{config.POSTGRES_USER}:{config.POSTGRES_PASSWORD}@{config.POSTGRES_HOST}:{config.POSTGRES_PORT}/{config.POSTGRES_DB}"
engine = create_async_engine(DATABASE_URL)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
# генерирует сессии для БД
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
I seem to be using asynchronous session receiving correctly
Here is the method for updating
class BlockService:
def __init__(self, block_repo: BlockRepository):
self.block_repo = block_repo
async def update_block(self, block_id: uuid.UUID, update_data: dict) -> Optional[Block]:
block = await self.block_repo.get(block_id)
if not block:
return None
for key, value in update_data.items():
if hasattr(block, key):
setattr(block, key, value)
if block.parentId == block.id:
raise ValueError("Block cannot be its own parent.")
return await self.block_repo.save(block)
you can see some repository
class BlockRepository(BaseDBRepository[models.Block]):
def __init__(self,
session: AsyncSession,
*args, **kwargs):
super().__init__(models.Block,
session,
*args, **kwargs)
async def get_blocks_by_page(self, page_id: UUID, many: bool = True) -> Optional[Sequence[models.Block] | models.Block]:
return await self.get_where(many=many, whereclause=models.Block.page_id == page_id)
and there is a basic repository here too. For the most part it is only needed for trivial interactions with the DB
AbstractModel = TypeVar('AbstractModel')
def rollback_wrapper(func):
@wraps(func)
async def inner(self, *args, **kwargs):
try:
return await func(self, *args, **kwargs)
except SQLAlchemyError as e:
await self.session.rollback()
logger.error(f"Error during DB operation: {e}")
raise e
return inner
class BaseDBRepository(Generic[AbstractModel]):
type_model: type[AbstractModel]
def __init__(self,
type_model: type[AbstractModel],
session: AsyncSession,
*args, **kwargs):
self.type_model = type_model
self.session = session
@rollback_wrapper
async def get(self,
ident: Any,
options: list = None
) -> AbstractModel | None:
"""Get an ONE model from the database with PK.
:param options:
:param ident: Key which need to find entry in database
:return:
"""
async with self.session.begin_nested():
options = options or []
statement = select(self.type_model).options(*options).where(self.type_model.id == ident)
result = await self.session.execute(statement)
return result.unique().scalar_one_or_none()
@rollback_wrapper
async def get_where(self,
many: bool = False,
whereclause=None,
limit: int | None = None,
offset: int | None = None,
order_by=None) -> Sequence[AbstractModel] | AbstractModel | None:
"""Get an ONE model from the database with whereclause.
:param offset:
:param many:
:param whereclause: Clause by which entry will be found
:param limit: Number of elements per query
:param order_by: Name of field for ordering
:return: Model if only one model was found, else None.
"""
async with self.session.begin_nested():
statement = select(self.type_model)
if whereclause is not None:
statement = statement.where(whereclause)
if limit is not None:
statement = statement.limit(limit)
if offset is not None:
statement = statement.offset(offset)
if order_by is not None:
statement = statement.order_by(order_by)
result = await self.session.execute(statement)
return result.unique().scalars().all() if many else result.scalar_one_or_none()
@rollback_wrapper
async def delete(self,
obj: AbstractModel):
logger.debug(f"[DB] Saving: {obj}")
async with self.session.begin_nested():
await self.session.delete(obj)
await self.session.commit()
@rollback_wrapper
async def save(
self,
obj: AbstractModel | Sequence[AbstractModel],
many: bool = False
) -> AbstractModel | Sequence[AbstractModel]:
"""
:param obj: object or objects to save
:param many: flag for many saves
:return:
"""
async with self.session.begin_nested():
logger.debug(f"[DB] Saving: {obj}")
if many and isinstance(obj, Sequence):
self.session.add_all(obj)
else:
self.session.add(obj)
await self.session.commit()
if many and isinstance(obj, Sequence):
for obj_item in obj:
await self.session.refresh(obj_item)
return obj
when checking through the debugger, for some reason I get an error in the loop in the update method (in BlockService class, update method)
for key, value in update_data.items():
if hasattr(block, key):
setattr(block, key, value)
because I put a debugger point on the condition right after the loop, there was already an error
here is the block model
class Block(Base):
__tablename__ = "blocks"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
type = Column(String, nullable=False)
props = Column(JSON, nullable=False)
parentId = Column(UUID, ForeignKey("blocks.id", ondelete="SET NULL"), nullable=True)
pageId = Column(UUID, ForeignKey("pages.id", ondelete="SET NULL"), nullable=True)
order = Column(Integer, nullable=False)
created_at = Column(DateTime, default=func.now(), nullable=False)
updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), nullable=False)
parent = relationship("Block", back_populates="children", remote_side="Block.id", lazy="subquery")
children = relationship("Block", back_populates="parent", cascade="all, delete-orphan", lazy="subquery")
page = relationship("Page", back_populates="blocks", lazy="subquery")
in general, the fields of the pydantic schema and this model match by name, so there should be no problems in the serialization and deserialization processes (and yes, I know that naming variables/class fields in camelCase is not accepted in Python, I will fix it later using alias)
I get this error
raise exc.MissingGreenlet(
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/20/xd2s)
4