Support handle_error for pre_ping

The :meth:`.DialectEvents.handle_error` event is now moved to the
:class:`.DialectEvents` suite from the :class:`.EngineEvents` suite, and
now participates in the connection pool "pre ping" event for those dialects
that make use of disconnect codes in order to detect if the database is
live. This allows end-user code to alter the state of "pre ping". Note that
this does not include dialects which contain a native "ping" method such as
that of psycopg2 or most MySQL dialects.

Fixes: #5648
Change-Id: I353d84a4f66f309d2467b7e67621db6b8c70411e
This commit is contained in:
Mike Bayer
2022-05-30 12:29:58 -04:00
parent d97de97eff
commit ad14471bc9
14 changed files with 365 additions and 226 deletions
+11
View File
@@ -0,0 +1,11 @@
.. change::
:tags: feature, engine
:tickets: 5648
The :meth:`.DialectEvents.handle_error` event is now moved to the
:class:`.DialectEvents` suite from the :class:`.EngineEvents` suite, and
now participates in the connection pool "pre ping" event for those dialects
that make use of disconnect codes in order to detect if the database is
live. This allows end-user code to alter the state of "pre ping". Note that
this does not include dialects which contain a native "ping" method such as
that of psycopg2 or most MySQL dialects.
+76 -30
View File
@@ -182,9 +182,10 @@ Disconnect Handling - Pessimistic
The pessimistic approach refers to emitting a test statement on the SQL
connection at the start of each connection pool checkout, to test
that the database connection is still viable. Typically, this
is a simple statement like "SELECT 1", but may also make use of some
DBAPI-specific method to test the connection for liveness.
that the database connection is still viable. The implementation is
dialect-specific, and makes use of either a DBAPI-specific ping method,
or by using a simple SQL statement like "SELECT 1", in order to test the
connection for liveness.
The approach adds a small bit of overhead to the connection checkout process,
however is otherwise the most simple and reliable approach to completely
@@ -192,6 +193,26 @@ eliminating database errors due to stale pooled connections. The calling
application does not need to be concerned about organizing operations
to be able to recover from stale connections checked out from the pool.
Pessimistic testing of connections upon checkout is achievable by
using the :paramref:`_pool.Pool.pre_ping` argument, available from :func:`_sa.create_engine`
via the :paramref:`_sa.create_engine.pool_pre_ping` argument::
engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)
The "pre ping" feature operates on a per-dialect basis either by invoking a
DBAPI-specific "ping" method, or if not available will emit SQL equivalent to
"SELECT 1", catching any errors and detecting the error as a "disconnect"
situation. If the ping / error check determines that the connection is not
usable, the connection will be immediately recycled, and all other pooled
connections older than the current time are invalidated, so that the next time
they are checked out, they will also be recycled before use.
If the database is still not available when "pre ping" runs, then the initial
connect will fail and the error for failure to connect will be propagated
normally. In the uncommon situation that the database is available for
connections, but is not able to respond to a "ping", the "pre_ping" will try up
to three times before giving up, propagating the database error last received.
It is critical to note that the pre-ping approach **does not accommodate for
connections dropped in the middle of transactions or other SQL operations**. If
the database becomes unavailable while a transaction is in progress, the
@@ -206,33 +227,9 @@ configured using DBAPI-level autocommit connections, as described at
mid-operation using events. See the section :ref:`faq_execute_retry` for
an example.
Pessimistic testing of connections upon checkout is achievable by
using the :paramref:`_pool.Pool.pre_ping` argument, available from :func:`_sa.create_engine`
via the :paramref:`_sa.create_engine.pool_pre_ping` argument::
engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)
The "pre ping" feature will normally emit SQL equivalent to "SELECT 1" each time a
connection is checked out from the pool; if an error is raised that is detected
as a "disconnect" situation, the connection will be immediately recycled, and
all other pooled connections older than the current time are invalidated, so
that the next time they are checked out, they will also be recycled before use.
If the database is still not available when "pre ping" runs, then the initial
connect will fail and the error for failure to connect will be propagated
normally. In the uncommon situation that the database is available for
connections, but is not able to respond to a "ping", the "pre_ping" will try up
to three times before giving up, propagating the database error last received.
.. note::
the "SELECT 1" emitted by "pre-ping" is invoked within the scope
of the connection pool / dialect, using a very short codepath for minimal
Python latency. As such, this statement is **not logged in the SQL
echo output**, and will not show up in SQLAlchemy's engine logging.
.. versionadded:: 1.2 Added "pre-ping" capability to the :class:`_pool.Pool`
class.
For dialects that make use of "SELECT 1" and catch errors in order to detect
disconnects, the disconnection test may be augmented for new backend-specific
error messages using the :meth:`_events.DialectEvents.handle_error` hook.
.. _pool_disconnects_pessimistic_custom:
@@ -335,6 +332,7 @@ correspond to a single request failing with a 500 error, then the web applicatio
continuing normally beyond that. Hence the approach is "optimistic" in that frequent
database restarts are not anticipated.
.. _pool_setting_recycle:
Setting Pool Recycle
@@ -397,6 +395,54 @@ a DBAPI connection might be invalidated include:
All invalidations which occur will invoke the :meth:`_events.PoolEvents.invalidate`
event.
.. _pool_new_disconnect_codes:
Supporting new database error codes for disconnect scenarios
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
SQLAlchemy dialects each include a routine called ``is_disconnect()`` that is
invoked whenever a DBAPI exception is encountered. The DBAPI exception object
is passed to this method, where dialect-specific heuristics will then determine
if the error code received indicates that the database connection has been
"disconnected", or is in an otherwise unusable state which indicates it should
be recycled. The heuristics applied here may be customized using the
:meth:`_events.DialectEvents.handle_error` event hook, which is typically
established via the owning :class:`_engine.Engine` object. Using this hook, all
errors which occur are delivered passing along a contextual object known as
:class:`.ExceptionContext`. Custom event hooks may control whether or not a
particular error should be considered a "disconnect" situation or not, as well
as if this disconnect should cause the entire connection pool to be invalidated
or not.
For example, to add support to consider the Oracle error codes
``DPY-1001`` and ``DPY-4011`` to be handled as disconnect codes, apply an
event handler to the engine after creation::
import re
from sqlalchemy import create_engine
engine = create_engine("oracle://scott:tiger@dnsname")
@event.listens_for(engine, "handle_error")
def handle_exception(context: ExceptionContext) -> None:
if not context.is_disconnect and re.match(
r"^(?:DPI-1001|DPI-4011)", str(context.original_exception)
):
context.is_disconnect = True
return None
The above error processing function will be invoked for all Oracle errors
raised, including those caught when using the
:ref:`pool pre ping <pool_disconnects_pessimistic>` feature for those backends
that rely upon disconnect error handling (new in 2.0).
.. seealso::
:meth:`_events.DialectEvents.handle_error`
.. _pool_use_lifo:
Using FIFO vs. LIFO
+38 -12
View File
@@ -1960,15 +1960,14 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
newraise = None
if (
self._has_events or self.engine._has_events
) and not self._execution_options.get(
if (self.dialect._has_events) and not self._execution_options.get(
"skip_user_error_events", False
):
ctx = ExceptionContextImpl(
e,
sqlalchemy_exception,
self.engine,
self.dialect,
self,
cursor,
statement,
@@ -1978,7 +1977,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
invalidate_pool_on_disconnect,
)
for fn in self.dispatch.handle_error:
for fn in self.dialect.dispatch.handle_error:
try:
# handler returns an exception;
# call next handler in a chain
@@ -2040,13 +2039,19 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
@classmethod
def _handle_dbapi_exception_noconnection(
cls, e: BaseException, dialect: Dialect, engine: Engine
cls,
e: BaseException,
dialect: Dialect,
engine: Optional[Engine] = None,
is_disconnect: Optional[bool] = None,
invalidate_pool_on_disconnect: bool = True,
) -> NoReturn:
exc_info = sys.exc_info()
is_disconnect = isinstance(
e, dialect.loaded_dbapi.Error
) and dialect.is_disconnect(e, None, None)
if is_disconnect is None:
is_disconnect = isinstance(
e, dialect.loaded_dbapi.Error
) and dialect.is_disconnect(e, None, None)
should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
@@ -2056,28 +2061,32 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
None,
cast(Exception, e),
dialect.loaded_dbapi.Error,
hide_parameters=engine.hide_parameters,
hide_parameters=engine.hide_parameters
if engine is not None
else False,
connection_invalidated=is_disconnect,
dialect=dialect,
)
else:
sqlalchemy_exception = None
newraise = None
if engine._has_events:
if dialect._has_events:
ctx = ExceptionContextImpl(
e,
sqlalchemy_exception,
engine,
dialect,
None,
None,
None,
None,
None,
is_disconnect,
True,
invalidate_pool_on_disconnect,
)
for fn in engine.dispatch.handle_error:
for fn in dialect.dispatch.handle_error:
try:
# handler returns an exception;
# call next handler in a chain
@@ -2121,11 +2130,27 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
class ExceptionContextImpl(ExceptionContext):
"""Implement the :class:`.ExceptionContext` interface."""
__slots__ = (
"connection",
"engine",
"dialect",
"cursor",
"statement",
"parameters",
"original_exception",
"sqlalchemy_exception",
"chained_exception",
"execution_context",
"is_disconnect",
"invalidate_pool_on_disconnect",
)
def __init__(
self,
exception: BaseException,
sqlalchemy_exception: Optional[exc.StatementError],
engine: Optional[Engine],
dialect: Dialect,
connection: Optional[Connection],
cursor: Optional[DBAPICursor],
statement: Optional[str],
@@ -2135,6 +2160,7 @@ class ExceptionContextImpl(ExceptionContext):
invalidate_pool_on_disconnect: bool,
):
self.engine = engine
self.dialect = dialect
self.connection = connection
self.sqlalchemy_exception = sqlalchemy_exception
self.original_exception = exception
+17 -1
View File
@@ -610,7 +610,23 @@ class DefaultDialect(Dialect):
finally:
cursor.close()
except self.loaded_dbapi.Error as err:
if self.is_disconnect(err, dbapi_connection, cursor):
is_disconnect = self.is_disconnect(err, dbapi_connection, cursor)
if self._has_events:
try:
Connection._handle_dbapi_exception_noconnection(
err,
self,
is_disconnect=is_disconnect,
invalidate_pool_on_disconnect=False,
)
except exc.StatementError as new_err:
is_disconnect = new_err.connection_invalidated
# other exceptions modified by the event handler will be
# thrown
if is_disconnect:
return False
else:
raise
+134 -137
View File
@@ -125,8 +125,9 @@ class ConnectionEvents(event.Events[ConnectionEventsTarget]):
def _accept_with(
cls,
target: Union[ConnectionEventsTarget, Type[ConnectionEventsTarget]],
identifier: str,
) -> Optional[Union[ConnectionEventsTarget, Type[ConnectionEventsTarget]]]:
default_dispatch = super()._accept_with(target)
default_dispatch = super()._accept_with(target, identifier)
if default_dispatch is None and hasattr(
target, "_no_async_engine_events"
):
@@ -147,7 +148,6 @@ class ConnectionEvents(event.Events[ConnectionEventsTarget]):
event_key.identifier,
event_key._listen_fn,
)
target._has_events = True
if not retval:
@@ -187,7 +187,6 @@ class ConnectionEvents(event.Events[ConnectionEventsTarget]):
elif retval and identifier not in (
"before_execute",
"before_cursor_execute",
"handle_error",
):
raise exc.ArgumentError(
"Only the 'before_execute', "
@@ -369,139 +368,6 @@ class ConnectionEvents(event.Events[ConnectionEventsTarget]):
"""
def handle_error(
self, exception_context: ExceptionContext
) -> Optional[BaseException]:
r"""Intercept all exceptions processed by the
:class:`_engine.Connection`.
This includes all exceptions emitted by the DBAPI as well as
within SQLAlchemy's statement invocation process, including
encoding errors and other statement validation errors. Other areas
in which the event is invoked include transaction begin and end,
result row fetching, cursor creation.
Note that :meth:`.handle_error` may support new kinds of exceptions
and new calling scenarios at *any time*. Code which uses this
event must expect new calling patterns to be present in minor
releases.
To support the wide variety of members that correspond to an exception,
as well as to allow extensibility of the event without backwards
incompatibility, the sole argument received is an instance of
:class:`.ExceptionContext`. This object contains data members
representing detail about the exception.
Use cases supported by this hook include:
* read-only, low-level exception handling for logging and
debugging purposes
* exception re-writing
* Establishing or disabling whether a connection or the owning
connection pool is invalidated or expired in response to a
specific exception [1]_.
The hook is called while the cursor from the failed operation
(if any) is still open and accessible. Special cleanup operations
can be called on this cursor; SQLAlchemy will attempt to close
this cursor subsequent to this hook being invoked.
.. note::
.. [1] The pool "pre_ping" handler enabled using the
:paramref:`_sa.create_engine.pool_pre_ping` parameter does
**not** consult this event before deciding if the "ping"
returned false, as opposed to receiving an unhandled error.
For this use case, the :ref:`legacy recipe based on
engine_connect() may be used
<pool_disconnects_pessimistic_custom>`. A future API allow
more comprehensive customization of the "disconnect"
detection mechanism across all functions.
A handler function has two options for replacing
the SQLAlchemy-constructed exception into one that is user
defined. It can either raise this new exception directly, in
which case all further event listeners are bypassed and the
exception will be raised, after appropriate cleanup as taken
place::
@event.listens_for(Engine, "handle_error")
def handle_exception(context):
if isinstance(context.original_exception,
psycopg2.OperationalError) and \
"failed" in str(context.original_exception):
raise MySpecialException("failed operation")
.. warning:: Because the
:meth:`_events.ConnectionEvents.handle_error`
event specifically provides for exceptions to be re-thrown as
the ultimate exception raised by the failed statement,
**stack traces will be misleading** if the user-defined event
handler itself fails and throws an unexpected exception;
the stack trace may not illustrate the actual code line that
failed! It is advised to code carefully here and use
logging and/or inline debugging if unexpected exceptions are
occurring.
Alternatively, a "chained" style of event handling can be
used, by configuring the handler with the ``retval=True``
modifier and returning the new exception instance from the
function. In this case, event handling will continue onto the
next handler. The "chained" exception is available using
:attr:`.ExceptionContext.chained_exception`::
@event.listens_for(Engine, "handle_error", retval=True)
def handle_exception(context):
if context.chained_exception is not None and \
"special" in context.chained_exception.message:
return MySpecialException("failed",
cause=context.chained_exception)
Handlers that return ``None`` may be used within the chain; when
a handler returns ``None``, the previous exception instance,
if any, is maintained as the current exception that is passed onto the
next handler.
When a custom exception is raised or returned, SQLAlchemy raises
this new exception as-is, it is not wrapped by any SQLAlchemy
object. If the exception is not a subclass of
:class:`sqlalchemy.exc.StatementError`,
certain features may not be available; currently this includes
the ORM's feature of adding a detail hint about "autoflush" to
exceptions raised within the autoflush process.
:param context: an :class:`.ExceptionContext` object. See this
class for details on all available members.
.. versionadded:: 0.9.7 Added the
:meth:`_events.ConnectionEvents.handle_error` hook.
.. versionchanged:: 1.1 The :meth:`.handle_error` event will now
receive all exceptions that inherit from ``BaseException``,
including ``SystemExit`` and ``KeyboardInterrupt``. The setting for
:attr:`.ExceptionContext.is_disconnect` is ``True`` in this case and
the default for
:attr:`.ExceptionContext.invalidate_pool_on_disconnect` is
``False``.
.. versionchanged:: 1.0.0 The :meth:`.handle_error` event is now
invoked when an :class:`_engine.Engine` fails during the initial
call to :meth:`_engine.Engine.connect`, as well as when a
:class:`_engine.Connection` object encounters an error during a
reconnect operation.
.. versionchanged:: 1.0.0 The :meth:`.handle_error` event is
not fired off when a dialect makes use of the
``skip_user_error_events`` execution option. This is used
by dialects which intend to catch SQLAlchemy-specific exceptions
within specific operations, such as when the MySQL dialect detects
a table not present within the ``has_table()`` dialect method.
Prior to 1.0.0, code which implements :meth:`.handle_error` needs
to ensure that exceptions thrown in these scenarios are re-raised
without modification.
"""
@event._legacy_signature(
"2.0", ["conn", "branch"], converter=lambda conn: (conn, False)
)
@@ -793,8 +659,11 @@ class DialectEvents(event.Events[Dialect]):
@classmethod
def _accept_with(
cls, target: Union[Engine, Type[Engine], Dialect, Type[Dialect]]
cls,
target: Union[Engine, Type[Engine], Dialect, Type[Dialect]],
identifier: str,
) -> Optional[Union[Dialect, Type[Dialect]]]:
if isinstance(target, type):
if issubclass(target, Engine):
return Dialect
@@ -804,11 +673,139 @@ class DialectEvents(event.Events[Dialect]):
return target.dialect
elif isinstance(target, Dialect):
return target
elif isinstance(target, Connection) and identifier == "handle_error":
raise exc.InvalidRequestError(
"The handle_error() event hook as of SQLAlchemy 2.0 is "
"established on the Dialect, and may only be applied to the "
"Engine as a whole or to a specific Dialect as a whole, "
"not on a per-Connection basis."
)
elif hasattr(target, "_no_async_engine_events"):
target._no_async_engine_events()
else:
return None
def handle_error(
self, exception_context: ExceptionContext
) -> Optional[BaseException]:
r"""Intercept all exceptions processed by the
:class:`_engine.Dialect`, typically but not limited to those
emitted within the scope of a :class:`_engine.Connection`.
.. versionchanged:: 2.0 the :meth:`.DialectEvents.handle_error` event
is moved to the :class:`.DialectEvents` class, moved from the
:class:`.ConnectionEvents` class, so that it may also participate in
the "pre ping" operation configured with the
:paramref:`_sa.create_engine.pool_pre_ping` parameter. The event
remains registered by using the :class:`_engine.Engine` as the event
target, however note that using the :class:`_engine.Connection` as
an event target for :meth:`.DialectEvents.handle_error` is no longer
supported.
This includes all exceptions emitted by the DBAPI as well as
within SQLAlchemy's statement invocation process, including
encoding errors and other statement validation errors. Other areas
in which the event is invoked include transaction begin and end,
result row fetching, cursor creation.
Note that :meth:`.handle_error` may support new kinds of exceptions
and new calling scenarios at *any time*. Code which uses this
event must expect new calling patterns to be present in minor
releases.
To support the wide variety of members that correspond to an exception,
as well as to allow extensibility of the event without backwards
incompatibility, the sole argument received is an instance of
:class:`.ExceptionContext`. This object contains data members
representing detail about the exception.
Use cases supported by this hook include:
* read-only, low-level exception handling for logging and
debugging purposes
* Establishing whether a DBAPI connection error message indicates
that the database connection needs to be reconnected, including
for the "pre_ping" handler used by **some** dialects
* Establishing or disabling whether a connection or the owning
connection pool is invalidated or expired in response to a
specific exception
* exception re-writing
The hook is called while the cursor from the failed operation
(if any) is still open and accessible. Special cleanup operations
can be called on this cursor; SQLAlchemy will attempt to close
this cursor subsequent to this hook being invoked.
As of SQLAlchemy 2.0, the "pre_ping" handler enabled using the
:paramref:`_sa.create_engine.pool_pre_ping` parameter will also
participate in the :meth:`.handle_error` process, **for those dialects
that rely upon disconnect codes to detect database liveness**. Note
that some dialects such as psycopg, psycopg2, and most MySQL dialects
make use of a native ``ping()`` method supplied by the DBAPI which does
not make use of disconnect codes.
A handler function has two options for replacing
the SQLAlchemy-constructed exception into one that is user
defined. It can either raise this new exception directly, in
which case all further event listeners are bypassed and the
exception will be raised, after appropriate cleanup as taken
place::
@event.listens_for(Engine, "handle_error")
def handle_exception(context):
if isinstance(context.original_exception,
psycopg2.OperationalError) and \
"failed" in str(context.original_exception):
raise MySpecialException("failed operation")
.. warning:: Because the
:meth:`_events.DialectEvents.handle_error`
event specifically provides for exceptions to be re-thrown as
the ultimate exception raised by the failed statement,
**stack traces will be misleading** if the user-defined event
handler itself fails and throws an unexpected exception;
the stack trace may not illustrate the actual code line that
failed! It is advised to code carefully here and use
logging and/or inline debugging if unexpected exceptions are
occurring.
Alternatively, a "chained" style of event handling can be
used, by configuring the handler with the ``retval=True``
modifier and returning the new exception instance from the
function. In this case, event handling will continue onto the
next handler. The "chained" exception is available using
:attr:`.ExceptionContext.chained_exception`::
@event.listens_for(Engine, "handle_error", retval=True)
def handle_exception(context):
if context.chained_exception is not None and \
"special" in context.chained_exception.message:
return MySpecialException("failed",
cause=context.chained_exception)
Handlers that return ``None`` may be used within the chain; when
a handler returns ``None``, the previous exception instance,
if any, is maintained as the current exception that is passed onto the
next handler.
When a custom exception is raised or returned, SQLAlchemy raises
this new exception as-is, it is not wrapped by any SQLAlchemy
object. If the exception is not a subclass of
:class:`sqlalchemy.exc.StatementError`,
certain features may not be available; currently this includes
the ORM's feature of adding a detail hint about "autoflush" to
exceptions raised within the autoflush process.
:param context: an :class:`.ExceptionContext` object. See this
class for details on all available members.
.. seealso::
:ref:`pool_new_disconnect_codes`
"""
def do_connect(
self,
dialect: Dialect,
+16 -8
View File
@@ -2541,11 +2541,21 @@ class ExceptionContext:
"""Encapsulate information about an error condition in progress.
This object exists solely to be passed to the
:meth:`_events.ConnectionEvents.handle_error` event,
:meth:`_events.DialectEvents.handle_error` event,
supporting an interface that
can be extended without backwards-incompatibility.
.. versionadded:: 0.9.7
"""
__slots__ = ()
dialect: Dialect
"""The :class:`_engine.Dialect` in use.
This member is present for all invocations of the event hook.
.. versionadded:: 2.0
"""
@@ -2565,10 +2575,8 @@ class ExceptionContext:
engine: Optional[Engine]
"""The :class:`_engine.Engine` in use during the exception.
This member should always be present, even in the case of a failure
when first connecting.
.. versionadded:: 1.0.0
This member is present in all cases except for when handling an error
within the connection pool "pre-ping" process.
"""
@@ -2646,7 +2654,7 @@ class ExceptionContext:
condition.
This flag will always be True or False within the scope of the
:meth:`_events.ConnectionEvents.handle_error` handler.
:meth:`_events.DialectEvents.handle_error` handler.
SQLAlchemy will defer to this flag in order to determine whether or not
the connection should be invalidated subsequently. That is, by
@@ -2671,7 +2679,7 @@ class ExceptionContext:
when a "disconnect" condition is in effect.
Setting this flag to False within the scope of the
:meth:`_events.ConnectionEvents.handle_error`
:meth:`_events.DialectEvents.handle_error`
event will have the effect such
that the full collection of connections in the pool will not be
invalidated during a disconnect; only the current connection that is the
+1 -1
View File
@@ -29,7 +29,7 @@ def _event_key(
target: _ET, identifier: str, fn: _ListenerFnType
) -> _EventKey[_ET]:
for evt_cls in _registrars[identifier]:
tgt = evt_cls._accept_with(target)
tgt = evt_cls._accept_with(target, identifier)
if tgt is not None:
return _EventKey(target, identifier, fn, tgt)
else:
+2 -2
View File
@@ -250,7 +250,7 @@ class _HasEventsDispatch(Generic[_ET]):
@classmethod
def _accept_with(
cls, target: Union[_ET, Type[_ET]]
cls, target: Union[_ET, Type[_ET]], identifier: str
) -> Optional[Union[_ET, Type[_ET]]]:
raise NotImplementedError()
@@ -334,7 +334,7 @@ class Events(_HasEventsDispatch[_ET]):
@classmethod
def _accept_with(
cls, target: Union[_ET, Type[_ET]]
cls, target: Union[_ET, Type[_ET]], identifier: str
) -> Optional[Union[_ET, Type[_ET]]]:
def dispatch_is(*types: Type[Any]) -> bool:
return all(isinstance(target.dispatch, t) for t in types)
+14 -18
View File
@@ -547,9 +547,20 @@ class DBAPIError(StatementError):
code = "dbapi"
# I dont think I'm going to try to do overloads like this everywhere
# in the library, but as this module is early days for me typing everything
# I am sort of just practicing
@overload
@classmethod
def instance(
cls,
statement: Optional[str],
params: Optional[_AnyExecuteParams],
orig: Exception,
dbapi_base_err: Type[Exception],
hide_parameters: bool = False,
connection_invalidated: bool = False,
dialect: Optional["Dialect"] = None,
ismulti: Optional[bool] = None,
) -> StatementError:
...
@overload
@classmethod
@@ -566,21 +577,6 @@ class DBAPIError(StatementError):
) -> DontWrapMixin:
...
@overload
@classmethod
def instance(
cls,
statement: Optional[str],
params: Optional[_AnyExecuteParams],
orig: Exception,
dbapi_base_err: Type[Exception],
hide_parameters: bool = False,
connection_invalidated: bool = False,
dialect: Optional["Dialect"] = None,
ismulti: Optional[bool] = None,
) -> StatementError:
...
@overload
@classmethod
def instance(
+6 -6
View File
@@ -62,7 +62,7 @@ class InstrumentationEvents(event.Events):
_dispatch_target = instrumentation.InstrumentationFactory
@classmethod
def _accept_with(cls, target):
def _accept_with(cls, target, identifier):
if isinstance(target, type):
return _InstrumentationEventsHold(target)
else:
@@ -203,7 +203,7 @@ class InstanceEvents(event.Events):
@classmethod
@util.preload_module("sqlalchemy.orm")
def _accept_with(cls, target):
def _accept_with(cls, target, identifier):
orm = util.preloaded.orm
if isinstance(target, instrumentation.ClassManager):
@@ -705,7 +705,7 @@ class MapperEvents(event.Events):
@classmethod
@util.preload_module("sqlalchemy.orm")
def _accept_with(cls, target):
def _accept_with(cls, target, identifier):
orm = util.preloaded.orm
if target is orm.mapper:
@@ -1383,7 +1383,7 @@ class SessionEvents(event.Events[Session]):
return fn
@classmethod
def _accept_with(cls, target):
def _accept_with(cls, target, identifier):
if isinstance(target, scoped_session):
target = target.session_factory
@@ -1409,7 +1409,7 @@ class SessionEvents(event.Events[Session]):
target._no_async_engine_events()
else:
# allows alternate SessionEvents-like-classes to be consulted
return event.Events._accept_with(target)
return event.Events._accept_with(target, identifier)
@classmethod
def _listen(
@@ -2263,7 +2263,7 @@ class AttributeEvents(event.Events):
return dispatch
@classmethod
def _accept_with(cls, target):
def _accept_with(cls, target, identifier):
# TODO: coverage
if isinstance(target, interfaces.MapperProperty):
return getattr(target.parent.class_, target.key)
+3 -1
View File
@@ -58,7 +58,9 @@ class PoolEvents(event.Events[Pool]):
@util.preload_module("sqlalchemy.engine")
@classmethod
def _accept_with(
cls, target: Union[Pool, Type[Pool], Engine, Type[Engine]]
cls,
target: Union[Pool, Type[Pool], Engine, Type[Engine]],
identifier: str,
) -> Optional[Union[Pool, Type[Pool]]]:
if not typing.TYPE_CHECKING:
Engine = util.preloaded.engine.Engine
+2 -2
View File
@@ -204,7 +204,7 @@ class EventsTest(TearDownLocalEventsFixture, fixtures.TestBase):
class E1(event.Events):
@classmethod
def _accept_with(cls, target):
def _accept_with(cls, target, identifier):
if isinstance(target, T1):
return target
else:
@@ -782,7 +782,7 @@ class CustomTargetsTest(TearDownLocalEventsFixture, fixtures.TestBase):
def setup_test(self):
class TargetEvents(event.Events):
@classmethod
def _accept_with(cls, target):
def _accept_with(cls, target, identifier):
if target == "one":
return Target
else:
+15
View File
@@ -3034,6 +3034,21 @@ class HandleErrorTest(fixtures.TestBase):
):
assert_raises(MySpecialException, conn.get_isolation_level)
def test_handle_error_not_on_connection(self, connection):
with expect_raises_message(
tsa.exc.InvalidRequestError,
r"The handle_error\(\) event hook as of SQLAlchemy 2.0 is "
r"established "
r"on the Dialect, and may only be applied to the Engine as a "
r"whole or to a specific Dialect as a whole, not on a "
r"per-Connection basis.",
):
@event.listens_for(connection, "handle_error")
def handle_error(ctx):
pass
@testing.only_on("sqlite+pysqlite")
def test_cursor_close_resultset_failed_connectionless(self):
engine = engines.testing_engine()
+30 -8
View File
@@ -13,7 +13,7 @@ from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import util
from sqlalchemy.engine import url
from sqlalchemy.engine import default
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import assert_raises_message_context_ok
@@ -165,10 +165,8 @@ class PrePingMockTest(fixtures.TestBase):
def setup_test(self):
self.dbapi = MockDBAPI()
def _pool_fixture(self, pre_ping, pool_kw=None):
dialect = url.make_url(
"postgresql+psycopg2://foo:bar@localhost/test"
).get_dialect()()
def _pool_fixture(self, pre_ping, setup_disconnect=True, pool_kw=None):
dialect = default.DefaultDialect()
dialect.dbapi = self.dbapi
_pool = pool.QueuePool(
creator=lambda: self.dbapi.connect("foo.db"),
@@ -177,9 +175,10 @@ class PrePingMockTest(fixtures.TestBase):
**(pool_kw if pool_kw else {}),
)
dialect.is_disconnect = lambda e, conn, cursor: isinstance(
e, MockDisconnect
)
if setup_disconnect:
dialect.is_disconnect = lambda e, conn, cursor: isinstance(
e, MockDisconnect
)
return _pool
def teardown_test(self):
@@ -262,6 +261,29 @@ class PrePingMockTest(fixtures.TestBase):
stale_cursor = stale_connection.cursor()
assert_raises(MockDisconnect, stale_cursor.execute, "hi")
def test_handle_error_sets_disconnect(self):
pool = self._pool_fixture(pre_ping=True, setup_disconnect=False)
@event.listens_for(pool._dialect, "handle_error")
def setup_disconnect(ctx):
assert isinstance(ctx.sqlalchemy_exception, exc.DBAPIError)
assert isinstance(ctx.original_exception, MockDisconnect)
ctx.is_disconnect = True
conn = pool.connect()
stale_connection = conn.dbapi_connection
conn.close()
self.dbapi.shutdown("execute")
self.dbapi.restart()
conn = pool.connect()
cursor = conn.cursor()
cursor.execute("hi")
stale_cursor = stale_connection.cursor()
assert_raises(MockDisconnect, stale_cursor.execute, "hi")
def test_raise_db_is_stopped(self):
pool = self._pool_fixture(pre_ping=True)