mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-07 01:10:52 -04:00
add AsyncAttrs
Added a new helper mixin :class:`_asyncio.AsyncAttrs` that seeks to improve the use of lazy-loader and other expired or deferred ORM attributes with asyncio, providing a simple attribute accessor that provides an ``await`` interface to any ORM attribute, whether or not it needs to emit SQL. Change-Id: I1427b288dc28319c854372643066c491b9ee8dc0 References: #9731
This commit is contained in:
+12
@@ -0,0 +1,12 @@
|
||||
.. change::
|
||||
:tags: usecase, asyncio
|
||||
:tickets: 9731
|
||||
|
||||
Added a new helper mixin :class:`_asyncio.AsyncAttrs` that seeks to improve
|
||||
the use of lazy-loader and other expired or deferred ORM attributes with
|
||||
asyncio, providing a simple attribute accessor that provides an ``await``
|
||||
interface to any ORM attribute, whether or not it needs to emit SQL.
|
||||
|
||||
.. seealso::
|
||||
|
||||
:class:`_asyncio.AsyncAttrs`
|
||||
Vendored
+72
-6
@@ -155,6 +155,7 @@ illustrates a complete example including mapper and session configuration::
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
@@ -165,7 +166,7 @@ illustrates a complete example including mapper and session configuration::
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
class Base(AsyncAttrs, DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
@@ -175,7 +176,7 @@ illustrates a complete example including mapper and session configuration::
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
data: Mapped[str]
|
||||
create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
|
||||
bs: Mapped[List[B]] = relationship(lazy="raise")
|
||||
bs: Mapped[List[B]] = relationship()
|
||||
|
||||
|
||||
class B(Base):
|
||||
@@ -225,6 +226,11 @@ illustrates a complete example including mapper and session configuration::
|
||||
# expire_on_commit=False allows
|
||||
print(a1.data)
|
||||
|
||||
# alternatively, AsyncAttrs may be used to access any attribute
|
||||
# as an awaitable (new in 2.0.13)
|
||||
for b1 in await a1.awaitable_attrs.bs:
|
||||
print(b1)
|
||||
|
||||
|
||||
async def async_main() -> None:
|
||||
engine = create_async_engine(
|
||||
@@ -269,6 +275,64 @@ Using traditional asyncio, the application needs to avoid any points at which
|
||||
IO-on-attribute access may occur. Techniques that can be used to help
|
||||
this are below, many of which are illustrated in the preceding example.
|
||||
|
||||
* Attributes that are lazy-loading relationships, deferred columns or
|
||||
expressions, or are being accessed in expiration scenarios can take advantage
|
||||
of the :class:`_asyncio.AsyncAttrs` mixin. This mixin, when added to a
|
||||
specific class or more generally to the Declarative ``Base`` superclass,
|
||||
provides an accessor :attr:`_asyncio.AsyncAttrs.awaitable_attrs`
|
||||
which delivers any attribute as an awaitable::
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import Mapped
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
|
||||
class Base(AsyncAttrs, DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class A(Base):
|
||||
__tablename__ = "a"
|
||||
|
||||
# ... rest of mapping ...
|
||||
|
||||
bs: Mapped[List[B]] = relationship()
|
||||
|
||||
|
||||
class B(Base):
|
||||
__tablename__ = "b"
|
||||
|
||||
# ... rest of mapping ...
|
||||
|
||||
Accessing the ``A.bs`` collection on newly loaded instances of ``A`` when
|
||||
eager loading is not in use will normally use :term:`lazy loading`, which in
|
||||
order to succeed will usually emit IO to the database, which will fail under
|
||||
asyncio as no implicit IO is allowed. To access this attribute directly under
|
||||
asyncio without any prior loading operations, the attribute can be accessed
|
||||
as an awaitable by indicating the :attr:`_asyncio.AsyncAttrs.awaitable_attrs`
|
||||
prefix::
|
||||
|
||||
a1 = await (session.scalars(select(A))).one()
|
||||
for b1 in await a1.awaitable_attrs.bs:
|
||||
print(b1)
|
||||
|
||||
The :class:`_asyncio.AsyncAttrs` mixin provides a succinct facade over the
|
||||
internal approach that's also used by the
|
||||
:meth:`_asyncio.AsyncSession.run_sync` method.
|
||||
|
||||
|
||||
.. versionadded:: 2.0.13
|
||||
|
||||
.. seealso::
|
||||
|
||||
:class:`_asyncio.AsyncAttrs`
|
||||
|
||||
|
||||
* Collections can be replaced with **write only collections** that will never
|
||||
emit IO implicitly, by using the :ref:`write_only_relationship` feature in
|
||||
SQLAlchemy 2.0. Using this feature, collections are never read from, only
|
||||
@@ -283,10 +347,9 @@ this are below, many of which are illustrated in the preceding example.
|
||||
bullets below address specific techniques when using traditional lazy-loaded
|
||||
relationships with asyncio, which requires more care.
|
||||
|
||||
* If using traditional ORM relationships which are subject to lazy loading,
|
||||
relationships can be declared with ``lazy="raise"`` so that by
|
||||
default they will not attempt to emit SQL. In order to load collections,
|
||||
:term:`eager loading` must be used in all cases.
|
||||
* If not using :class:`_asyncio.AsyncAttrs`, relationships can be declared
|
||||
with ``lazy="raise"`` so that by default they will not attempt to emit SQL.
|
||||
In order to load collections, :term:`eager loading` would be used instead.
|
||||
|
||||
* The most useful eager loading strategy is the
|
||||
:func:`_orm.selectinload` eager loader, which is employed in the previous
|
||||
@@ -1019,6 +1082,9 @@ ORM Session API Documentation
|
||||
:members:
|
||||
:inherited-members:
|
||||
|
||||
.. autoclass:: AsyncAttrs
|
||||
:members:
|
||||
|
||||
.. autoclass:: AsyncSession
|
||||
:members:
|
||||
:exclude-members: sync_session_class
|
||||
|
||||
@@ -12,15 +12,18 @@ from typing import Optional
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
from sqlalchemy.future import select
|
||||
from sqlalchemy.orm import declarative_base
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import Mapped
|
||||
from sqlalchemy.orm import mapped_column
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class Base(AsyncAttrs, DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class A(Base):
|
||||
@@ -31,7 +34,7 @@ class A(Base):
|
||||
create_date: Mapped[datetime.datetime] = mapped_column(
|
||||
server_default=func.now()
|
||||
)
|
||||
bs: Mapped[List[B]] = relationship(lazy="raise")
|
||||
bs: Mapped[List[B]] = relationship()
|
||||
|
||||
|
||||
class B(Base):
|
||||
@@ -93,11 +96,15 @@ async def async_main():
|
||||
|
||||
result = await session.scalars(select(A).order_by(A.id))
|
||||
|
||||
a1 = result.first()
|
||||
a1 = result.one()
|
||||
|
||||
a1.data = "new data"
|
||||
|
||||
await session.commit()
|
||||
|
||||
# use the AsyncAttrs interface to accommodate for a lazy load
|
||||
for b1 in await a1.awaitable_attrs.bs:
|
||||
print(b1)
|
||||
|
||||
|
||||
asyncio.run(async_main())
|
||||
|
||||
@@ -11,15 +11,18 @@ from typing import Optional
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
from sqlalchemy.future import select
|
||||
from sqlalchemy.orm import declarative_base
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import Mapped
|
||||
from sqlalchemy.orm import mapped_column
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.orm import WriteOnlyMapped
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class Base(AsyncAttrs, DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class A(Base):
|
||||
|
||||
@@ -10,13 +10,16 @@ from sqlalchemy import Column
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy import Integer
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.future import select
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class Base(AsyncAttrs, DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class A(Base):
|
||||
|
||||
@@ -19,5 +19,6 @@ from .scoping import async_scoped_session as async_scoped_session
|
||||
from .session import async_object_session as async_object_session
|
||||
from .session import async_session as async_session
|
||||
from .session import async_sessionmaker as async_sessionmaker
|
||||
from .session import AsyncAttrs as AsyncAttrs
|
||||
from .session import AsyncSession as AsyncSession
|
||||
from .session import AsyncSessionTransaction as AsyncSessionTransaction
|
||||
|
||||
@@ -8,6 +8,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
from typing import Awaitable
|
||||
from typing import Callable
|
||||
from typing import Dict
|
||||
from typing import Generic
|
||||
@@ -73,6 +74,99 @@ _EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True})
|
||||
_STREAM_OPTIONS = util.immutabledict({"stream_results": True})
|
||||
|
||||
|
||||
class AsyncAttrs:
|
||||
"""Mixin class which provides an awaitable accessor for all attributes.
|
||||
|
||||
E.g.::
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import Mapped
|
||||
from sqlalchemy.orm import mapped_column
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
|
||||
class Base(AsyncAttrs, DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class A(Base):
|
||||
__tablename__ = "a"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
data: Mapped[str]
|
||||
bs: Mapped[List[B]] = relationship()
|
||||
|
||||
|
||||
class B(Base):
|
||||
__tablename__ = "b"
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
|
||||
data: Mapped[str]
|
||||
|
||||
In the above example, the :class:`_asyncio.AsyncAttrs` mixin is applied to
|
||||
the declarative ``Base`` class where it takes effect for all subclasses.
|
||||
This mixin adds a single new attribute
|
||||
:attr:`_asyncio.AsyncAttrs.awaitable_attrs` to all classes, which will
|
||||
yield the value of any attribute as an awaitable. This allows attributes
|
||||
which may be subject to lazy loading or deferred / unexpiry loading to be
|
||||
accessed such that IO can still be emitted::
|
||||
|
||||
a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
|
||||
|
||||
# use the lazy loader on ``a1.bs`` via the ``.async_attrs``
|
||||
# interface, so that it may be awaited
|
||||
for b1 in await a1.async_attrs.bs:
|
||||
print(b1)
|
||||
|
||||
The :attr:`_asyncio.AsyncAttrs.awaitable_attrs` performs a call against the
|
||||
attribute that is approximately equivalent to using the
|
||||
:meth:`_asyncio.AsyncSession.run_sync` method, e.g.::
|
||||
|
||||
for b1 in await async_session.run_sync(lambda sess: a1.bs):
|
||||
print(b1)
|
||||
|
||||
.. versionadded:: 2.0.13
|
||||
|
||||
.. seealso::
|
||||
|
||||
:ref:`asyncio_orm_avoid_lazyloads`
|
||||
|
||||
"""
|
||||
|
||||
class _AsyncAttrGetitem:
|
||||
__slots__ = "_instance"
|
||||
|
||||
def __init__(self, _instance: Any):
|
||||
self._instance = _instance
|
||||
|
||||
def __getattr__(self, name: str) -> Awaitable[Any]:
|
||||
return greenlet_spawn(getattr, self._instance, name)
|
||||
|
||||
@property
|
||||
def awaitable_attrs(self) -> AsyncAttrs._AsyncAttrGetitem:
|
||||
"""provide a namespace of all attributes on this object wrapped
|
||||
as awaitables.
|
||||
|
||||
e.g.::
|
||||
|
||||
|
||||
a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
|
||||
|
||||
some_attribute = await a1.async_attrs.some_deferred_attribute
|
||||
some_collection = await a1.async_attrs.some_collection
|
||||
|
||||
""" # noqa: E501
|
||||
|
||||
return AsyncAttrs._AsyncAttrGetitem(self)
|
||||
|
||||
|
||||
@util.create_proxy_methods(
|
||||
Session,
|
||||
":class:`_orm.Session`",
|
||||
@@ -268,7 +362,7 @@ class AsyncSession(ReversibleProxy[Session]):
|
||||
to the database connection by running the given callable in a
|
||||
specially instrumented greenlet.
|
||||
|
||||
.. note::
|
||||
.. tip::
|
||||
|
||||
The provided callable is invoked inline within the asyncio event
|
||||
loop, and will block on traditional IO calls. IO within this
|
||||
@@ -277,6 +371,9 @@ class AsyncSession(ReversibleProxy[Session]):
|
||||
|
||||
.. seealso::
|
||||
|
||||
:class:`.AsyncAttrs` - a mixin for ORM mapped classes that provides
|
||||
a similar feature more succinctly on a per-attribute basis
|
||||
|
||||
:meth:`.AsyncConnection.run_sync`
|
||||
|
||||
:ref:`session_run_sync`
|
||||
|
||||
@@ -1,20 +1,31 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import Column
|
||||
from sqlalchemy import event
|
||||
from sqlalchemy import exc
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy import Identity
|
||||
from sqlalchemy import inspect
|
||||
from sqlalchemy import Integer
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import Sequence
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy import Table
|
||||
from sqlalchemy import testing
|
||||
from sqlalchemy import update
|
||||
from sqlalchemy.ext.asyncio import async_object_session
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.ext.asyncio import exc as async_exc
|
||||
from sqlalchemy.ext.asyncio.base import ReversibleProxy
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import Mapped
|
||||
from sqlalchemy.orm import mapped_column
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.orm import selectinload
|
||||
from sqlalchemy.orm import Session
|
||||
@@ -24,6 +35,7 @@ from sqlalchemy.testing import config
|
||||
from sqlalchemy.testing import engines
|
||||
from sqlalchemy.testing import eq_
|
||||
from sqlalchemy.testing import expect_raises_message
|
||||
from sqlalchemy.testing import fixtures
|
||||
from sqlalchemy.testing import is_
|
||||
from sqlalchemy.testing import is_true
|
||||
from sqlalchemy.testing import mock
|
||||
@@ -45,6 +57,12 @@ class AsyncFixture(_AsyncFixture, _fixtures.FixtureTest):
|
||||
def async_engine(self):
|
||||
return engines.testing_engine(asyncio=True, transfer_staticpool=True)
|
||||
|
||||
# TODO: this seems to cause deadlocks in
|
||||
# OverrideSyncSession for some reason
|
||||
# @testing.fixture
|
||||
# def async_engine(self, async_testing_engine):
|
||||
# return async_testing_engine(transfer_staticpool=True)
|
||||
|
||||
@testing.fixture
|
||||
def async_session(self, async_engine):
|
||||
return AsyncSession(async_engine)
|
||||
@@ -1005,3 +1023,103 @@ class OverrideSyncSession(AsyncFixture):
|
||||
|
||||
is_true(not isinstance(ass.sync_session, _MySession))
|
||||
is_(ass.sync_session_class, Session)
|
||||
|
||||
|
||||
class AsyncAttrsTest(
|
||||
testing.AssertsExecutionResults, _AsyncFixture, fixtures.TestBase
|
||||
):
|
||||
__requires__ = ("async_dialect",)
|
||||
|
||||
@config.fixture
|
||||
def decl_base(self, metadata):
|
||||
_md = metadata
|
||||
|
||||
class Base(fixtures.ComparableEntity, AsyncAttrs, DeclarativeBase):
|
||||
metadata = _md
|
||||
type_annotation_map = {
|
||||
str: String().with_variant(
|
||||
String(50), "mysql", "mariadb", "oracle"
|
||||
)
|
||||
}
|
||||
|
||||
yield Base
|
||||
Base.registry.dispose()
|
||||
|
||||
@testing.fixture
|
||||
def async_engine(self, async_testing_engine):
|
||||
yield async_testing_engine(transfer_staticpool=True)
|
||||
|
||||
@testing.fixture
|
||||
def ab_fixture(self, decl_base):
|
||||
class A(decl_base):
|
||||
__tablename__ = "a"
|
||||
|
||||
id: Mapped[int] = mapped_column(Identity(), primary_key=True)
|
||||
data: Mapped[Optional[str]]
|
||||
bs: Mapped[List[B]] = relationship(order_by=lambda: B.id)
|
||||
|
||||
class B(decl_base):
|
||||
__tablename__ = "b"
|
||||
id: Mapped[int] = mapped_column(Identity(), primary_key=True)
|
||||
a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
|
||||
data: Mapped[Optional[str]]
|
||||
|
||||
decl_base.metadata.create_all(testing.db)
|
||||
|
||||
return A, B
|
||||
|
||||
@async_test
|
||||
async def test_lazyloaders(self, async_engine, ab_fixture):
|
||||
A, B = ab_fixture
|
||||
|
||||
async with AsyncSession(async_engine) as session:
|
||||
b1, b2, b3 = B(data="b1"), B(data="b2"), B(data="b3")
|
||||
a1 = A(data="a1", bs=[b1, b2, b3])
|
||||
session.add(a1)
|
||||
|
||||
await session.commit()
|
||||
|
||||
assert inspect(a1).expired
|
||||
|
||||
with self.assert_statement_count(async_engine.sync_engine, 1):
|
||||
eq_(await a1.awaitable_attrs.data, "a1")
|
||||
|
||||
with self.assert_statement_count(async_engine.sync_engine, 1):
|
||||
eq_(await a1.awaitable_attrs.bs, [b1, b2, b3])
|
||||
|
||||
# now it's loaded, lazy loading not used anymore
|
||||
eq_(a1.bs, [b1, b2, b3])
|
||||
|
||||
@async_test
|
||||
async def test_it_didnt_load_but_is_ok(self, async_engine, ab_fixture):
|
||||
A, B = ab_fixture
|
||||
|
||||
async with AsyncSession(async_engine) as session:
|
||||
b1, b2, b3 = B(data="b1"), B(data="b2"), B(data="b3")
|
||||
a1 = A(data="a1", bs=[b1, b2, b3])
|
||||
session.add(a1)
|
||||
|
||||
await session.commit()
|
||||
|
||||
async with AsyncSession(async_engine) as session:
|
||||
a1 = (
|
||||
await session.scalars(select(A).options(selectinload(A.bs)))
|
||||
).one()
|
||||
|
||||
with self.assert_statement_count(async_engine.sync_engine, 0):
|
||||
eq_(await a1.awaitable_attrs.bs, [b1, b2, b3])
|
||||
|
||||
@async_test
|
||||
async def test_the_famous_lazyloader_gotcha(
|
||||
self, async_engine, ab_fixture
|
||||
):
|
||||
A, B = ab_fixture
|
||||
|
||||
async with AsyncSession(async_engine) as session:
|
||||
a1 = A(data="a1")
|
||||
session.add(a1)
|
||||
|
||||
await session.flush()
|
||||
|
||||
with self.assert_statement_count(async_engine.sync_engine, 1):
|
||||
eq_(await a1.awaitable_attrs.bs, [])
|
||||
|
||||
Reference in New Issue
Block a user