I have a query with subquery. It partitions based on ID and selects the newest of each one in the partitions. It works fine:
subquery = db.query(
func.rank()
.over(
order_by=Table.CreatedAt.desc(),
partition_by=[Table.ID],
)
.label("rank"),
Table,
).subquery()
rows: list[Table] = db.query(subquery).filter(subquery.c.rank == 1).all()
The problems is the type of rows
isn’t list[Table]
but rather list[Rows]
and it loses all of the ORM information.
How do I get it to return list[Table]
?
The solution is to query for the model class, passing the subquery to the query’s from_statement method:
stmt = db.query(subquery).filter(subquery.c.rank == 1)
rows: List[Table] = db.query(Table).from_statement(stmt).all()
or using from_statement in SQLAlchemy 2.0’s style:
import sqlalchemy as sa
...
stmt = sa.select(subquery).where(subquery.c.rank == 1)
rows: List[Table] = s.scalars(sa.select(Table).from_statement(stmt)).all()