I’m having a lot of trouble using SQLAlchemy to store the results of distributed jobs. Previously, I had an abstraction layer between the SQLAlchemy objects and the main code, meaning that all database objects were converted into pure Python objects before being used in the main logic (and vice-versa). I am now trying to use the database objects directly to reduce overhead. However, I get unexpected clashes between database objects.
Here is a minimal example which puzzles me:
import pytest
from typing import List, Optional
from sqlalchemy import (
String,
ForeignKey,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
declarative_base,
Session,
)
from sqlalchemy.engine import create_engine
from dask.distributed import (
Client,
LocalCluster,
)
Base = declarative_base()
def get_fields(obj, *fields):
ret = {}
for f in fields:
if hasattr(obj, f):
ret[f] = getattr(obj, f)
return ret
def set_fields(obj, **fields):
for k, v in fields.items():
setattr(obj, k, v)
class Parent(Base):
__tablename__ = "parent"
name: Mapped[str] = mapped_column(String(100), primary_key=True)
child1_name: Mapped[Optional[str]] = mapped_column(ForeignKey("child.name"))
child1: Mapped["Child"] = relationship(
back_populates="parents", foreign_keys=child1_name, lazy="select"
)
child2_name: Mapped[Optional[str]] = mapped_column(
ForeignKey("child.name", ondelete="SET NULL")
)
child2: Mapped[Optional["Child"]] = relationship(
back_populates="child2_of",
foreign_keys=child2_name,
lazy="select",
passive_deletes=True,
)
class Child(Base):
__tablename__ = "child"
name: Mapped[str] = mapped_column(String(100), primary_key=True)
parents: Mapped[Optional[List["Parent"]]] = relationship(
back_populates="child1",
primaryjoin="Child.name==foreign(Parent.child1_name)",
)
# Removing this backlink (and the reference in Parent.child2) makes the test pass
child2_of: Mapped[Optional[List["Parent"]]] = relationship(
back_populates="child2",
primaryjoin="Child.name==foreign(Parent.child2_name)",
)
class Job:
def __init__(self, target):
self.target = target
self.results = {}
def set(self, **kwargs):
self.results.update(**kwargs)
def run(self):
self.target.child2 = Child(name="prod")
self.set(
**get_fields(
self.target,
"child2",
)
)
"""
# Works fine
self.results["child2"] = Child(name="child2")
"""
return self
def process_results(self, sess):
self.target = sess.get(Parent, self.target.name)
set_fields(self.target, **self.results)
sess.commit()
@pytest.fixture()
def client():
cluster = LocalCluster(n_workers=1, processes=False)
client = Client(cluster)
yield client
client.close()
cluster.close()
def test_job(client):
eng = create_engine(
"sqlite://",
)
Base.metadata.create_all(eng)
sess = Session(eng)
parent = Parent(name="parent")
sess.add(parent)
sess.commit()
parent = sess.get(Parent, parent.name)
job = Job(parent)
job = client.gather(
client.submit(
job.run,
)
)
"""
# Works fine
job = job.run()
"""
assert "child2" in job.results
assert job.results["child2"] is not None
job.process_results(sess)
The test (pytest sample.py
) fails with this error:
sqlalchemy.exc.InvalidRequestError: Can't attach instance <Parent at 0x14cedd3985e0>; another instance with key (<class 'min.Parent'>, ('parent',), None) is already present in this session.
This only occurs when using Dask (i.e., another thread). It also only occurs if Job.target
is modified during the job’s execution, which I find odd since it stays the same Parent
object. Moreover, removing the backreference Child.child2_of
also makes this error go away, which doesn’t make any sense to me.
- What explains this behavior?
- More generally, what would be a better way to run
Job
objects and save the results with SQLAlchemy in the same classes?