mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-06 08:56:51 -04:00
add session-wide execution_options
Added support for per-session execution options that are merged into all queries executed within that session. The :class:`_orm.Session`, :class:`_orm.sessionmaker`, :class:`_orm.scoped_session`, :class:`_ext.asyncio.AsyncSession`, and :class:`_ext.asyncio.async_sessionmaker` constructors now accept an :paramref:`_orm.Session.execution_options` parameter that will be applied to all explicit query executions (e.g. using :meth:`_orm.Session.execute`, :meth:`_orm.Session.get`, :meth:`_orm.Session.scalars`) for that session instance. Fixes: #12659 Change-Id: I6e19e1567e0c04df32ba1d43baf420fb762f155c
This commit is contained in:
+13
@@ -0,0 +1,13 @@
|
||||
.. change::
|
||||
:tags: feature, orm
|
||||
:tickets: 12659
|
||||
|
||||
Added support for per-session execution options that are merged into all
|
||||
queries executed within that session. The :class:`_orm.Session`,
|
||||
:class:`_orm.sessionmaker`, :class:`_orm.scoped_session`,
|
||||
:class:`_ext.asyncio.AsyncSession`, and
|
||||
:class:`_ext.asyncio.async_sessionmaker` constructors now accept an
|
||||
:paramref:`_orm.Session.execution_options` parameter that will be applied
|
||||
to all explicit query executions (e.g. using :meth:`_orm.Session.execute`,
|
||||
:meth:`_orm.Session.get`, :meth:`_orm.Session.scalars`) for that session
|
||||
instance.
|
||||
@@ -114,6 +114,7 @@ _Ts = TypeVarTuple("_Ts")
|
||||
"autoflush",
|
||||
"no_autoflush",
|
||||
"info",
|
||||
"execution_options",
|
||||
],
|
||||
use_intermediate_variable=["get"],
|
||||
)
|
||||
@@ -1571,6 +1572,25 @@ class async_scoped_session(Generic[_AS]):
|
||||
|
||||
return self._proxied.info
|
||||
|
||||
@property
|
||||
def execution_options(self) -> Any:
|
||||
r"""Proxy for the :attr:`_orm.Session.execution_options` attribute
|
||||
on behalf of the :class:`_asyncio.AsyncSession` class.
|
||||
|
||||
.. container:: class_bases
|
||||
|
||||
Proxied for the :class:`_asyncio.AsyncSession` class
|
||||
on behalf of the :class:`_asyncio.scoping.async_scoped_session` class.
|
||||
|
||||
|
||||
""" # noqa: E501
|
||||
|
||||
return self._proxied.execution_options
|
||||
|
||||
@execution_options.setter
|
||||
def execution_options(self, attr: Any) -> None:
|
||||
self._proxied.execution_options = attr
|
||||
|
||||
@classmethod
|
||||
async def close_all(cls) -> None:
|
||||
r"""Close all :class:`_asyncio.AsyncSession` sessions.
|
||||
|
||||
@@ -55,6 +55,7 @@ if TYPE_CHECKING:
|
||||
from ...engine import RowMapping
|
||||
from ...engine import ScalarResult
|
||||
from ...engine.interfaces import _CoreAnyExecuteParams
|
||||
from ...engine.interfaces import _ExecuteOptions
|
||||
from ...engine.interfaces import CoreExecuteOptionsParameter
|
||||
from ...event import dispatcher
|
||||
from ...orm._typing import _IdentityKeyType
|
||||
@@ -203,6 +204,7 @@ class AsyncAttrs:
|
||||
"autoflush",
|
||||
"no_autoflush",
|
||||
"info",
|
||||
"execution_options",
|
||||
],
|
||||
)
|
||||
class AsyncSession(ReversibleProxy[Session]):
|
||||
@@ -1587,6 +1589,19 @@ class AsyncSession(ReversibleProxy[Session]):
|
||||
|
||||
return self._proxied.info
|
||||
|
||||
@property
|
||||
def execution_options(self) -> _ExecuteOptions:
|
||||
r"""Proxy for the :attr:`_orm.Session.execution_options` attribute
|
||||
on behalf of the :class:`_asyncio.AsyncSession` class.
|
||||
|
||||
""" # noqa: E501
|
||||
|
||||
return self._proxied.execution_options
|
||||
|
||||
@execution_options.setter
|
||||
def execution_options(self, attr: _ExecuteOptions) -> None:
|
||||
self._proxied.execution_options = attr
|
||||
|
||||
@classmethod
|
||||
def object_session(cls, instance: object) -> Optional[Session]:
|
||||
r"""Return the :class:`.Session` to which an object belongs.
|
||||
|
||||
@@ -58,6 +58,7 @@ if TYPE_CHECKING:
|
||||
from ..engine import RowMapping
|
||||
from ..engine.interfaces import _CoreAnyExecuteParams
|
||||
from ..engine.interfaces import _CoreSingleExecuteParams
|
||||
from ..engine.interfaces import _ExecuteOptions
|
||||
from ..engine.interfaces import CoreExecuteOptionsParameter
|
||||
from ..engine.result import ScalarResult
|
||||
from ..sql._typing import _ColumnsClauseArgument
|
||||
@@ -146,6 +147,7 @@ __all__ = ["scoped_session"]
|
||||
"autoflush",
|
||||
"no_autoflush",
|
||||
"info",
|
||||
"execution_options",
|
||||
],
|
||||
)
|
||||
class scoped_session(Generic[_S]):
|
||||
@@ -774,6 +776,13 @@ class scoped_session(Generic[_S]):
|
||||
by :meth:`_engine.Connection.execution_options`, and may also
|
||||
provide additional options understood only in an ORM context.
|
||||
|
||||
The execution_options are passed along to methods like
|
||||
:meth:`.Connection.execute` on :class:`.Connection` giving the
|
||||
highest priority to execution_options that are passed to this
|
||||
method explicitly, then the options that are present on the
|
||||
statement object if any, and finally those options present
|
||||
session-wide.
|
||||
|
||||
.. seealso::
|
||||
|
||||
:ref:`orm_queryguide_execution_options` - ORM-specific execution
|
||||
@@ -2145,6 +2154,19 @@ class scoped_session(Generic[_S]):
|
||||
|
||||
return self._proxied.info
|
||||
|
||||
@property
|
||||
def execution_options(self) -> _ExecuteOptions:
|
||||
r"""Proxy for the :attr:`_orm.Session.execution_options` attribute
|
||||
on behalf of the :class:`_orm.scoping.scoped_session` class.
|
||||
|
||||
""" # noqa: E501
|
||||
|
||||
return self._proxied.execution_options
|
||||
|
||||
@execution_options.setter
|
||||
def execution_options(self, attr: _ExecuteOptions) -> None:
|
||||
self._proxied.execution_options = attr
|
||||
|
||||
@classmethod
|
||||
def object_session(cls, instance: object) -> Optional[Session]:
|
||||
r"""Return the :class:`.Session` to which an object belongs.
|
||||
|
||||
@@ -1484,6 +1484,7 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
enable_baked_queries: bool
|
||||
twophase: bool
|
||||
join_transaction_mode: JoinTransactionMode
|
||||
execution_options: _ExecuteOptions = util.EMPTY_DICT
|
||||
_query_cls: Type[Query[Any]]
|
||||
_close_state: _SessionCloseState
|
||||
|
||||
@@ -1503,6 +1504,7 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
autocommit: Literal[False] = False,
|
||||
join_transaction_mode: JoinTransactionMode = "conditional_savepoint",
|
||||
close_resets_only: Union[bool, _NoArg] = _NoArg.NO_ARG,
|
||||
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
|
||||
):
|
||||
r"""Construct a new :class:`_orm.Session`.
|
||||
|
||||
@@ -1598,6 +1600,15 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
flag therefore only affects applications that are making explicit
|
||||
use of this extension within their own code.
|
||||
|
||||
:param execution_options: optional dictionary of execution options
|
||||
that will be applied to all calls to :meth:`_orm.Session.execute`,
|
||||
:meth:`_orm.Session.scalars`, and similar. Execution options
|
||||
present in statements as well as options passed to methods like
|
||||
:meth:`_orm.Session.execute` explicitly take precedence over
|
||||
the session-wide options.
|
||||
|
||||
.. versionadded:: 2.1
|
||||
|
||||
:param expire_on_commit: Defaults to ``True``. When ``True``, all
|
||||
instances will be fully expired after each :meth:`~.commit`,
|
||||
so that all attribute/object access subsequent to a completed
|
||||
@@ -1759,6 +1770,10 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
self.autoflush = autoflush
|
||||
self.expire_on_commit = expire_on_commit
|
||||
self.enable_baked_queries = enable_baked_queries
|
||||
if execution_options:
|
||||
self.execution_options = self.execution_options.union(
|
||||
execution_options
|
||||
)
|
||||
|
||||
# the idea is that at some point NO_ARG will warn that in the future
|
||||
# the default will switch to close_resets_only=False.
|
||||
@@ -2157,7 +2172,28 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
compile_state_cls = None
|
||||
bind_arguments.setdefault("clause", statement)
|
||||
|
||||
execution_options = util.coerce_to_immutabledict(execution_options)
|
||||
combined_execution_options: util.immutabledict[str, Any] = (
|
||||
util.coerce_to_immutabledict(execution_options)
|
||||
)
|
||||
if self.execution_options:
|
||||
# merge given execution options with session-wide execution
|
||||
# options. if the statement also has execution_options,
|
||||
# maintain priority of session.execution_options ->
|
||||
# statement.execution_options -> method passed execution_options
|
||||
# by omitting from the base execution options those keys that
|
||||
# will come from the statement
|
||||
if statement._execution_options:
|
||||
combined_execution_options = util.immutabledict(
|
||||
{
|
||||
k: v
|
||||
for k, v in self.execution_options.items()
|
||||
if k not in statement._execution_options
|
||||
}
|
||||
).union(combined_execution_options)
|
||||
else:
|
||||
combined_execution_options = self.execution_options.union(
|
||||
combined_execution_options
|
||||
)
|
||||
|
||||
if _parent_execute_state:
|
||||
events_todo = _parent_execute_state._remaining_events()
|
||||
@@ -2176,12 +2212,12 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
# as "pre fetch" for DML, etc.
|
||||
(
|
||||
statement,
|
||||
execution_options,
|
||||
combined_execution_options,
|
||||
) = compile_state_cls.orm_pre_session_exec(
|
||||
self,
|
||||
statement,
|
||||
params,
|
||||
execution_options,
|
||||
combined_execution_options,
|
||||
bind_arguments,
|
||||
True,
|
||||
)
|
||||
@@ -2190,7 +2226,7 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
self,
|
||||
statement,
|
||||
params,
|
||||
execution_options,
|
||||
combined_execution_options,
|
||||
bind_arguments,
|
||||
compile_state_cls,
|
||||
events_todo,
|
||||
@@ -2207,7 +2243,7 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
return fn_result
|
||||
|
||||
statement = orm_exec_state.statement
|
||||
execution_options = orm_exec_state.local_execution_options
|
||||
combined_execution_options = orm_exec_state.local_execution_options
|
||||
|
||||
if compile_state_cls is not None:
|
||||
# now run orm_pre_session_exec() "for real". if there were
|
||||
@@ -2217,12 +2253,12 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
# autoflush will also be invoked in this step if enabled.
|
||||
(
|
||||
statement,
|
||||
execution_options,
|
||||
combined_execution_options,
|
||||
) = compile_state_cls.orm_pre_session_exec(
|
||||
self,
|
||||
statement,
|
||||
params,
|
||||
execution_options,
|
||||
combined_execution_options,
|
||||
bind_arguments,
|
||||
False,
|
||||
)
|
||||
@@ -2238,7 +2274,9 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
if TYPE_CHECKING:
|
||||
params = cast(_CoreSingleExecuteParams, params)
|
||||
return conn.scalar(
|
||||
statement, params or {}, execution_options=execution_options
|
||||
statement,
|
||||
params or {},
|
||||
execution_options=combined_execution_options,
|
||||
)
|
||||
|
||||
if compile_state_cls:
|
||||
@@ -2247,14 +2285,14 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
self,
|
||||
statement,
|
||||
params or {},
|
||||
execution_options,
|
||||
combined_execution_options,
|
||||
bind_arguments,
|
||||
conn,
|
||||
)
|
||||
)
|
||||
else:
|
||||
result = conn.execute(
|
||||
statement, params, execution_options=execution_options
|
||||
statement, params, execution_options=combined_execution_options
|
||||
)
|
||||
|
||||
if _scalar_result:
|
||||
@@ -2332,6 +2370,13 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
by :meth:`_engine.Connection.execution_options`, and may also
|
||||
provide additional options understood only in an ORM context.
|
||||
|
||||
The execution_options are passed along to methods like
|
||||
:meth:`.Connection.execute` on :class:`.Connection` giving the
|
||||
highest priority to execution_options that are passed to this
|
||||
method explicitly, then the options that are present on the
|
||||
statement object if any, and finally those options present
|
||||
session-wide.
|
||||
|
||||
.. seealso::
|
||||
|
||||
:ref:`orm_queryguide_execution_options` - ORM-specific execution
|
||||
@@ -3876,6 +3921,8 @@ class Session(_SessionClassMethods, EventTarget):
|
||||
|
||||
if options:
|
||||
statement = statement.options(*options)
|
||||
if self.execution_options:
|
||||
execution_options = self.execution_options.union(execution_options)
|
||||
return db_load_fn(
|
||||
self,
|
||||
statement,
|
||||
|
||||
@@ -170,6 +170,99 @@ class AsyncSessionTest(AsyncFixture):
|
||||
not_in(u1, s1)
|
||||
not_in(u2, s2)
|
||||
|
||||
@async_test
|
||||
@testing.variation("session_type", ["plain", "sessionmaker"])
|
||||
@testing.variation("merge", [True, False])
|
||||
@testing.variation("method", ["scalar", "execute", "scalars", "get"])
|
||||
@testing.variation("add_statement_options", [True, False])
|
||||
async def test_execution_options(
|
||||
self,
|
||||
async_engine,
|
||||
session_type: testing.Variation,
|
||||
merge: testing.Variation,
|
||||
method: testing.Variation,
|
||||
add_statement_options: testing.Variation,
|
||||
):
|
||||
User = self.classes.User
|
||||
|
||||
session_execution_options = {
|
||||
"populate_existing": True,
|
||||
"autoflush": False,
|
||||
"opt1": "z",
|
||||
"opt5": "q",
|
||||
}
|
||||
|
||||
expected_opts = session_execution_options
|
||||
|
||||
if add_statement_options:
|
||||
statement_options = {"opt2": "w", "opt4": "y", "opt5": "w"}
|
||||
expected_opts = {**expected_opts, **statement_options}
|
||||
else:
|
||||
statement_options = {}
|
||||
|
||||
if merge:
|
||||
query_opts = {
|
||||
"compiled_cache": {},
|
||||
"opt1": "q",
|
||||
"opt2": "p",
|
||||
"opt3": "r",
|
||||
"populate_existing": False,
|
||||
}
|
||||
expected_opts = {**expected_opts, **query_opts}
|
||||
else:
|
||||
query_opts = {}
|
||||
|
||||
if session_type.plain:
|
||||
sess = AsyncSession(
|
||||
async_engine, execution_options=session_execution_options
|
||||
)
|
||||
elif session_type.sessionmaker:
|
||||
maker = async_sessionmaker(
|
||||
async_engine, execution_options=session_execution_options
|
||||
)
|
||||
sess = maker()
|
||||
else:
|
||||
session_type.fail()
|
||||
|
||||
gather_options = {}
|
||||
|
||||
@event.listens_for(sess.sync_session, "do_orm_execute")
|
||||
def check(ctx) -> None:
|
||||
assert not gather_options
|
||||
gather_options.update(ctx.execution_options)
|
||||
|
||||
if method.scalar:
|
||||
statement = select(User).limit(1)
|
||||
if add_statement_options:
|
||||
statement = statement.execution_options(**statement_options)
|
||||
await sess.scalar(statement, execution_options=query_opts)
|
||||
elif method.execute:
|
||||
statement = select(User).limit(1)
|
||||
if add_statement_options:
|
||||
statement = statement.execution_options(**statement_options)
|
||||
await sess.execute(statement, execution_options=query_opts)
|
||||
elif method.scalars:
|
||||
statement = select(User).limit(1)
|
||||
if add_statement_options:
|
||||
statement = statement.execution_options(**statement_options)
|
||||
await sess.scalars(statement, execution_options=query_opts)
|
||||
elif method.get:
|
||||
if add_statement_options:
|
||||
await sess.get(
|
||||
User,
|
||||
1,
|
||||
execution_options={**statement_options, **query_opts},
|
||||
)
|
||||
else:
|
||||
await sess.get(User, 1, execution_options=query_opts)
|
||||
else:
|
||||
method.fail()
|
||||
|
||||
await sess.close()
|
||||
|
||||
for key, value in expected_opts.items():
|
||||
eq_(gather_options[key], value)
|
||||
|
||||
|
||||
class AsyncSessionQueryTest(AsyncFixture):
|
||||
@async_test
|
||||
|
||||
@@ -721,6 +721,106 @@ class SessionStateTest(_fixtures.FixtureTest):
|
||||
s4 = maker2(info={"s4": 8})
|
||||
eq_(s4.info, {"s4": 8})
|
||||
|
||||
@testing.variation("session_type", ["plain", "sessionmaker"])
|
||||
@testing.variation("merge", [True, False])
|
||||
@testing.variation(
|
||||
"method", ["scalar", "execute", "scalars", "get", "query"]
|
||||
)
|
||||
@testing.variation("add_statement_options", [True, False])
|
||||
def test_execution_options(
|
||||
self,
|
||||
session_type: testing.Variation,
|
||||
merge: testing.Variation,
|
||||
method: testing.Variation,
|
||||
add_statement_options: testing.Variation,
|
||||
):
|
||||
users, User = self.tables.users, self.classes.User
|
||||
self.mapper_registry.map_imperatively(User, users)
|
||||
|
||||
session_execution_options = {
|
||||
"populate_existing": True,
|
||||
"autoflush": False,
|
||||
"opt1": "z",
|
||||
"opt5": "q",
|
||||
}
|
||||
|
||||
expected_opts = session_execution_options
|
||||
|
||||
if add_statement_options:
|
||||
statement_options = {"opt2": "w", "opt4": "y", "opt5": "w"}
|
||||
expected_opts = {**expected_opts, **statement_options}
|
||||
else:
|
||||
statement_options = {}
|
||||
|
||||
if merge:
|
||||
query_opts = {
|
||||
"compiled_cache": {},
|
||||
"opt1": "q",
|
||||
"opt2": "p",
|
||||
"opt3": "r",
|
||||
"populate_existing": False,
|
||||
}
|
||||
expected_opts = {**expected_opts, **query_opts}
|
||||
else:
|
||||
query_opts = {}
|
||||
|
||||
if session_type.plain:
|
||||
sess = Session(
|
||||
testing.db, execution_options=session_execution_options
|
||||
)
|
||||
elif session_type.sessionmaker:
|
||||
maker = sessionmaker(
|
||||
testing.db, execution_options=session_execution_options
|
||||
)
|
||||
sess = maker()
|
||||
else:
|
||||
session_type.fail()
|
||||
|
||||
gather_options = {}
|
||||
|
||||
@event.listens_for(sess, "do_orm_execute")
|
||||
def check(ctx: ORMExecuteState) -> None:
|
||||
assert not gather_options
|
||||
gather_options.update(ctx.execution_options)
|
||||
|
||||
if method.scalar:
|
||||
statement = select(User).limit(1)
|
||||
if add_statement_options:
|
||||
statement = statement.execution_options(**statement_options)
|
||||
sess.scalar(statement, execution_options=query_opts)
|
||||
elif method.execute:
|
||||
statement = select(User).limit(1)
|
||||
if add_statement_options:
|
||||
statement = statement.execution_options(**statement_options)
|
||||
sess.execute(statement, execution_options=query_opts)
|
||||
elif method.scalars:
|
||||
statement = select(User).limit(1)
|
||||
if add_statement_options:
|
||||
statement = statement.execution_options(**statement_options)
|
||||
sess.scalars(statement, execution_options=query_opts)
|
||||
elif method.get:
|
||||
if add_statement_options:
|
||||
sess.get(
|
||||
User,
|
||||
1,
|
||||
execution_options={**statement_options, **query_opts},
|
||||
)
|
||||
else:
|
||||
sess.get(User, 1, execution_options=query_opts)
|
||||
elif method.query:
|
||||
q = sess.query(User).limit(1)
|
||||
if add_statement_options:
|
||||
q = q.execution_options(**statement_options)
|
||||
q = q.execution_options(**query_opts)
|
||||
q.all()
|
||||
else:
|
||||
method.fail()
|
||||
|
||||
sess.close()
|
||||
|
||||
for key, value in expected_opts.items():
|
||||
eq_(gather_options[key], value)
|
||||
|
||||
def test_autocommit_kw_accepted_but_must_be_false(self):
|
||||
Session(autocommit=False)
|
||||
|
||||
|
||||
@@ -307,13 +307,26 @@ def process_class(
|
||||
"import annotations set up?"
|
||||
)
|
||||
|
||||
existing_doc = None
|
||||
|
||||
if attr is not None:
|
||||
if isinstance(attr, property):
|
||||
readonly = attr.fset is None
|
||||
existing_doc = attr.__doc__
|
||||
elif isinstance(attr, langhelpers.generic_fn_descriptor):
|
||||
readonly = True
|
||||
else:
|
||||
existing_doc = attr.__doc__
|
||||
elif hasattr(attr, "__get__"):
|
||||
readonly = not hasattr(attr, "__set__")
|
||||
existing_doc = attr.__doc__
|
||||
else:
|
||||
# not a descriptor
|
||||
readonly = False
|
||||
|
||||
else:
|
||||
readonly = False
|
||||
|
||||
if existing_doc:
|
||||
doc = textwrap.indent(
|
||||
inject_docstring_text(
|
||||
attr.__doc__,
|
||||
@@ -330,7 +343,6 @@ def process_class(
|
||||
" ",
|
||||
).lstrip()
|
||||
else:
|
||||
readonly = False
|
||||
doc = (
|
||||
f"Proxy for the :attr:`{sphinx_symbol}.{name}` "
|
||||
"attribute \n"
|
||||
|
||||
Reference in New Issue
Block a user