add AsyncAttrs

Added a new helper mixin :class:`_asyncio.AsyncAttrs` that seeks to improve
the use of lazy-loader and other expired or deferred ORM attributes with
asyncio, providing a simple attribute accessor that provides an ``await``
interface to any ORM attribute, whether or not it needs to emit SQL.

Change-Id: I1427b288dc28319c854372643066c491b9ee8dc0
References: #9731
This commit is contained in:
Mike Bayer
2023-05-05 11:50:29 -04:00
parent dc60e7a7d3
commit b3216486c4
8 changed files with 322 additions and 15 deletions
+12
View File
@@ -0,0 +1,12 @@
.. change::
:tags: usecase, asyncio
:tickets: 9731
Added a new helper mixin :class:`_asyncio.AsyncAttrs` that seeks to improve
the use of lazy-loader and other expired or deferred ORM attributes with
asyncio, providing a simple attribute accessor that provides an ``await``
interface to any ORM attribute, whether or not it needs to emit SQL.
.. seealso::
:class:`_asyncio.AsyncAttrs`
+72 -6
View File
@@ -155,6 +155,7 @@ illustrates a complete example including mapper and session configuration::
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
@@ -165,7 +166,7 @@ illustrates a complete example including mapper and session configuration::
from sqlalchemy.orm import selectinload
class Base(DeclarativeBase):
class Base(AsyncAttrs, DeclarativeBase):
pass
@@ -175,7 +176,7 @@ illustrates a complete example including mapper and session configuration::
id: Mapped[int] = mapped_column(primary_key=True)
data: Mapped[str]
create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
bs: Mapped[List[B]] = relationship(lazy="raise")
bs: Mapped[List[B]] = relationship()
class B(Base):
@@ -225,6 +226,11 @@ illustrates a complete example including mapper and session configuration::
# expire_on_commit=False allows
print(a1.data)
# alternatively, AsyncAttrs may be used to access any attribute
# as an awaitable (new in 2.0.13)
for b1 in await a1.awaitable_attrs.bs:
print(b1)
async def async_main() -> None:
engine = create_async_engine(
@@ -269,6 +275,64 @@ Using traditional asyncio, the application needs to avoid any points at which
IO-on-attribute access may occur. Techniques that can be used to help
this are below, many of which are illustrated in the preceding example.
* Attributes that are lazy-loading relationships, deferred columns or
expressions, or are being accessed in expiration scenarios can take advantage
of the :class:`_asyncio.AsyncAttrs` mixin. This mixin, when added to a
specific class or more generally to the Declarative ``Base`` superclass,
provides an accessor :attr:`_asyncio.AsyncAttrs.awaitable_attrs`
which delivers any attribute as an awaitable::
from __future__ import annotations
from typing import List
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import relationship
class Base(AsyncAttrs, DeclarativeBase):
pass
class A(Base):
__tablename__ = "a"
# ... rest of mapping ...
bs: Mapped[List[B]] = relationship()
class B(Base):
__tablename__ = "b"
# ... rest of mapping ...
Accessing the ``A.bs`` collection on newly loaded instances of ``A`` when
eager loading is not in use will normally use :term:`lazy loading`, which in
order to succeed will usually emit IO to the database, which will fail under
asyncio as no implicit IO is allowed. To access this attribute directly under
asyncio without any prior loading operations, the attribute can be accessed
as an awaitable by indicating the :attr:`_asyncio.AsyncAttrs.awaitable_attrs`
prefix::
a1 = await (session.scalars(select(A))).one()
for b1 in await a1.awaitable_attrs.bs:
print(b1)
The :class:`_asyncio.AsyncAttrs` mixin provides a succinct facade over the
internal approach that's also used by the
:meth:`_asyncio.AsyncSession.run_sync` method.
.. versionadded:: 2.0.13
.. seealso::
:class:`_asyncio.AsyncAttrs`
* Collections can be replaced with **write only collections** that will never
emit IO implicitly, by using the :ref:`write_only_relationship` feature in
SQLAlchemy 2.0. Using this feature, collections are never read from, only
@@ -283,10 +347,9 @@ this are below, many of which are illustrated in the preceding example.
bullets below address specific techniques when using traditional lazy-loaded
relationships with asyncio, which requires more care.
* If using traditional ORM relationships which are subject to lazy loading,
relationships can be declared with ``lazy="raise"`` so that by
default they will not attempt to emit SQL. In order to load collections,
:term:`eager loading` must be used in all cases.
* If not using :class:`_asyncio.AsyncAttrs`, relationships can be declared
with ``lazy="raise"`` so that by default they will not attempt to emit SQL.
In order to load collections, :term:`eager loading` would be used instead.
* The most useful eager loading strategy is the
:func:`_orm.selectinload` eager loader, which is employed in the previous
@@ -1019,6 +1082,9 @@ ORM Session API Documentation
:members:
:inherited-members:
.. autoclass:: AsyncAttrs
:members:
.. autoclass:: AsyncSession
:members:
:exclude-members: sync_session_class
+11 -4
View File
@@ -12,15 +12,18 @@ from typing import Optional
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
Base = declarative_base()
class Base(AsyncAttrs, DeclarativeBase):
pass
class A(Base):
@@ -31,7 +34,7 @@ class A(Base):
create_date: Mapped[datetime.datetime] = mapped_column(
server_default=func.now()
)
bs: Mapped[List[B]] = relationship(lazy="raise")
bs: Mapped[List[B]] = relationship()
class B(Base):
@@ -93,11 +96,15 @@ async def async_main():
result = await session.scalars(select(A).order_by(A.id))
a1 = result.first()
a1 = result.one()
a1.data = "new data"
await session.commit()
# use the AsyncAttrs interface to accommodate for a lazy load
for b1 in await a1.awaitable_attrs.bs:
print(b1)
asyncio.run(async_main())
+5 -2
View File
@@ -11,15 +11,18 @@ from typing import Optional
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import WriteOnlyMapped
Base = declarative_base()
class Base(AsyncAttrs, DeclarativeBase):
pass
class A(Base):
+5 -2
View File
@@ -10,13 +10,16 @@ from sqlalchemy import Column
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
Base = declarative_base()
class Base(AsyncAttrs, DeclarativeBase):
pass
class A(Base):
+1
View File
@@ -19,5 +19,6 @@ from .scoping import async_scoped_session as async_scoped_session
from .session import async_object_session as async_object_session
from .session import async_session as async_session
from .session import async_sessionmaker as async_sessionmaker
from .session import AsyncAttrs as AsyncAttrs
from .session import AsyncSession as AsyncSession
from .session import AsyncSessionTransaction as AsyncSessionTransaction
+98 -1
View File
@@ -8,6 +8,7 @@ from __future__ import annotations
import asyncio
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import Dict
from typing import Generic
@@ -73,6 +74,99 @@ _EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True})
_STREAM_OPTIONS = util.immutabledict({"stream_results": True})
class AsyncAttrs:
"""Mixin class which provides an awaitable accessor for all attributes.
E.g.::
from __future__ import annotations
from typing import List
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
class Base(AsyncAttrs, DeclarativeBase):
pass
class A(Base):
__tablename__ = "a"
id: Mapped[int] = mapped_column(primary_key=True)
data: Mapped[str]
bs: Mapped[List[B]] = relationship()
class B(Base):
__tablename__ = "b"
id: Mapped[int] = mapped_column(primary_key=True)
a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
data: Mapped[str]
In the above example, the :class:`_asyncio.AsyncAttrs` mixin is applied to
the declarative ``Base`` class where it takes effect for all subclasses.
This mixin adds a single new attribute
:attr:`_asyncio.AsyncAttrs.awaitable_attrs` to all classes, which will
yield the value of any attribute as an awaitable. This allows attributes
which may be subject to lazy loading or deferred / unexpiry loading to be
accessed such that IO can still be emitted::
a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
# use the lazy loader on ``a1.bs`` via the ``.async_attrs``
# interface, so that it may be awaited
for b1 in await a1.async_attrs.bs:
print(b1)
The :attr:`_asyncio.AsyncAttrs.awaitable_attrs` performs a call against the
attribute that is approximately equivalent to using the
:meth:`_asyncio.AsyncSession.run_sync` method, e.g.::
for b1 in await async_session.run_sync(lambda sess: a1.bs):
print(b1)
.. versionadded:: 2.0.13
.. seealso::
:ref:`asyncio_orm_avoid_lazyloads`
"""
class _AsyncAttrGetitem:
__slots__ = "_instance"
def __init__(self, _instance: Any):
self._instance = _instance
def __getattr__(self, name: str) -> Awaitable[Any]:
return greenlet_spawn(getattr, self._instance, name)
@property
def awaitable_attrs(self) -> AsyncAttrs._AsyncAttrGetitem:
"""provide a namespace of all attributes on this object wrapped
as awaitables.
e.g.::
a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
some_attribute = await a1.async_attrs.some_deferred_attribute
some_collection = await a1.async_attrs.some_collection
""" # noqa: E501
return AsyncAttrs._AsyncAttrGetitem(self)
@util.create_proxy_methods(
Session,
":class:`_orm.Session`",
@@ -268,7 +362,7 @@ class AsyncSession(ReversibleProxy[Session]):
to the database connection by running the given callable in a
specially instrumented greenlet.
.. note::
.. tip::
The provided callable is invoked inline within the asyncio event
loop, and will block on traditional IO calls. IO within this
@@ -277,6 +371,9 @@ class AsyncSession(ReversibleProxy[Session]):
.. seealso::
:class:`.AsyncAttrs` - a mixin for ORM mapped classes that provides
a similar feature more succinctly on a per-attribute basis
:meth:`.AsyncConnection.run_sync`
:ref:`session_run_sync`
+118
View File
@@ -1,20 +1,31 @@
from __future__ import annotations
from typing import List
from typing import Optional
from sqlalchemy import Column
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Identity
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy import Sequence
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy import update
from sqlalchemy.ext.asyncio import async_object_session
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import exc as async_exc
from sqlalchemy.ext.asyncio.base import ReversibleProxy
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
@@ -24,6 +35,7 @@ from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
@@ -45,6 +57,12 @@ class AsyncFixture(_AsyncFixture, _fixtures.FixtureTest):
def async_engine(self):
return engines.testing_engine(asyncio=True, transfer_staticpool=True)
# TODO: this seems to cause deadlocks in
# OverrideSyncSession for some reason
# @testing.fixture
# def async_engine(self, async_testing_engine):
# return async_testing_engine(transfer_staticpool=True)
@testing.fixture
def async_session(self, async_engine):
return AsyncSession(async_engine)
@@ -1005,3 +1023,103 @@ class OverrideSyncSession(AsyncFixture):
is_true(not isinstance(ass.sync_session, _MySession))
is_(ass.sync_session_class, Session)
class AsyncAttrsTest(
testing.AssertsExecutionResults, _AsyncFixture, fixtures.TestBase
):
__requires__ = ("async_dialect",)
@config.fixture
def decl_base(self, metadata):
_md = metadata
class Base(fixtures.ComparableEntity, AsyncAttrs, DeclarativeBase):
metadata = _md
type_annotation_map = {
str: String().with_variant(
String(50), "mysql", "mariadb", "oracle"
)
}
yield Base
Base.registry.dispose()
@testing.fixture
def async_engine(self, async_testing_engine):
yield async_testing_engine(transfer_staticpool=True)
@testing.fixture
def ab_fixture(self, decl_base):
class A(decl_base):
__tablename__ = "a"
id: Mapped[int] = mapped_column(Identity(), primary_key=True)
data: Mapped[Optional[str]]
bs: Mapped[List[B]] = relationship(order_by=lambda: B.id)
class B(decl_base):
__tablename__ = "b"
id: Mapped[int] = mapped_column(Identity(), primary_key=True)
a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
data: Mapped[Optional[str]]
decl_base.metadata.create_all(testing.db)
return A, B
@async_test
async def test_lazyloaders(self, async_engine, ab_fixture):
A, B = ab_fixture
async with AsyncSession(async_engine) as session:
b1, b2, b3 = B(data="b1"), B(data="b2"), B(data="b3")
a1 = A(data="a1", bs=[b1, b2, b3])
session.add(a1)
await session.commit()
assert inspect(a1).expired
with self.assert_statement_count(async_engine.sync_engine, 1):
eq_(await a1.awaitable_attrs.data, "a1")
with self.assert_statement_count(async_engine.sync_engine, 1):
eq_(await a1.awaitable_attrs.bs, [b1, b2, b3])
# now it's loaded, lazy loading not used anymore
eq_(a1.bs, [b1, b2, b3])
@async_test
async def test_it_didnt_load_but_is_ok(self, async_engine, ab_fixture):
A, B = ab_fixture
async with AsyncSession(async_engine) as session:
b1, b2, b3 = B(data="b1"), B(data="b2"), B(data="b3")
a1 = A(data="a1", bs=[b1, b2, b3])
session.add(a1)
await session.commit()
async with AsyncSession(async_engine) as session:
a1 = (
await session.scalars(select(A).options(selectinload(A.bs)))
).one()
with self.assert_statement_count(async_engine.sync_engine, 0):
eq_(await a1.awaitable_attrs.bs, [b1, b2, b3])
@async_test
async def test_the_famous_lazyloader_gotcha(
self, async_engine, ab_fixture
):
A, B = ab_fixture
async with AsyncSession(async_engine) as session:
a1 = A(data="a1")
session.add(a1)
await session.flush()
with self.assert_statement_count(async_engine.sync_engine, 1):
eq_(await a1.awaitable_attrs.bs, [])