By trying to access to the class attribute i got an error greenlet_spawn. How i can fix that?
My code example:
import asyncio
from uuid import uuid4
from asyncpg import Connection
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import Column, BigInteger, Pool, String, create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
class CConnection(Connection):
def _get_unique_id(self, prefix: str) -> str:
return f"__asyncpg_{prefix}_{uuid4()}"
class Base(DeclarativeBase):
pass
class Message(Base):
__tablename__ = "message"
id = Column(BigInteger, primary_key=True)
message = Column(String, nullable=False)
# ... other cols
class User(Base):
__tablename__ = "user"
id = Column(BigInteger, primary_key=True)
username = Column(String)
# ... other cols
# ... other models
def get_engine(
database_url: str = None,
session_name: str = "Unknown",
pool_size: int = 5,
max_overflow: int = 0,
pool_class: Pool = None,
is_async: bool = False
):
if is_async:
return create_async_engine(
url=database_url,
poolclass=pool_class,
pool_size=pool_size,
max_overflow=max_overflow,
pool_recycle=3600,
future=True,
connect_args={
"statement_cache_size": 0,
"prepared_statement_cache_size": 0,
"connection_class": CConnection,
"server_settings": {
"options": "-c timezone=utc -c statement_timeout=1800000",
"application_name": session_name,
}
},
)
else:
return create_engine(
url=database_url,
pool_recycle=3600,
poolclass=pool_class,
pool_size=pool_size,
max_overflow=max_overflow,
connect_args={
"options": "-c timezone=utc -c statement_timeout=1800000",
"application_name": session_name,
}
)
def get_session_maker(engine, is_async: bool = False):
if is_async:
return async_sessionmaker(bind=engine, autoflush=False, expire_on_commit=False, class_=AsyncSession)
else:
return sessionmaker(bind=engine, autoflush=False, expire_on_commit=False)
def get_session(
database_url: str = None,
session_name: str = "Unknown",
pool_size: int = 3,
max_overflow: int = 0,
pool_class: Pool = None,
is_async: bool = False,
with_engine: bool = False
):
engine = get_engine(database_url, session_name, pool_size, max_overflow, pool_class, is_async)
session = get_session_maker(engine, is_async)
if with_engine:
return session(), engine
else:
return session()
async def head_main(*args, **kwargs):
session = get_session("postgresql+asyncpg://postgres:postgres@localhost/exp")
session_lock = asyncio.Lock()
queue = asyncio.Queue()
# ... some inner funcs
async def prepare_message():
if queue.empty():
return
while not queue.empty():
async with session_lock:
message: Message = await queue.get()
async with session_lock:
message.message = _clear_message(message.message) # error trying to access message attribute
asyncio.gather(
prepare_message,
# ... some other funcs
)
From another sources i got textual data and they processing in different inner functions, and writing to the database.
There’s also a relationship problem. I tried doing joinedload, but still an error(
May be problem in session_lock context?
Exception:
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/20/xd2s)