Update transaction / connection handling

step one, do away with __connection attribute and using
awkward AttributeError logic

step two, move all management of "connection._transaction"
into the transaction objects themselves where it's easier
to follow.

build MarkerTransaction that takes the role of
"do-nothing block"

new connection datamodel is: connection._transaction, always
a root, connection._nested_transaction, always a nested.

nested transactions still chain to each other as this
is still sort of necessary but they consider the root
transaction separately, and the marker transactions
not at all.

introduce new InvalidRequestError subclass
PendingRollbackError.  Apply to connection and session
for all cases where a transaction needs to be rolled
back before continuing.   Within Connection,
both PendingRollbackError as well as ResourceClosedError
are now raised directly without being handled by
handle_dbapi_error();  this removes these two exception
cases from the handle_error event handler as well as
from StatementError wrapping, as these two exceptions are
not statement oriented and are instead programmatic
issues, that the application is failing to handle database
errors properly.

Revise savepoints so that when a release fails, they set
themselves as inactive so that their rollback() method
does not throw another exception.

Give savepoints another go on MySQL, can't get release working
however get support for basic round trip going

Fixes: #5327
Change-Id: Ia3cbbf56d4882fcc7980f90519412f1711fae74d
This commit is contained in:
Mike Bayer
2020-05-14 12:50:11 -04:00
parent 79de84b25e
commit 0e53221eef
13 changed files with 1181 additions and 563 deletions
+42 -20
View File
@@ -225,47 +225,69 @@ sooner.
:ref:`connections_toplevel`
.. _error_8s2b:
Can't reconnect until invalid transaction is rolled back
----------------------------------------------------------
This error condition refers to the case where a :class:`_engine.Connection` was
invalidated, either due to a database disconnect detection or due to an
explicit call to :meth:`_engine.Connection.invalidate`, but there is still a
transaction present that was initiated by the :meth:`_engine.Connection.begin`
method. When a connection is invalidated, any :class:`_engine.Transaction`
that was in progress is now in an invalid state, and must be explicitly rolled
back in order to remove it from the :class:`_engine.Connection`.
.. _error_8s2a:
This connection is on an inactive transaction. Please rollback() fully before proceeding
------------------------------------------------------------------------------------------
This error condition was added to SQLAlchemy as of version 1.4. The error
refers to the state where a :class:`_engine.Connection` is placed into a transaction
using a method like :meth:`_engine.Connection.begin`, and then a further "sub" transaction
is created within that scope; the "sub" transaction is then rolled back using
:meth:`.Transaction.rollback`, however the outer transaction is not rolled back.
refers to the state where a :class:`_engine.Connection` is placed into a
transaction using a method like :meth:`_engine.Connection.begin`, and then a
further "marker" transaction is created within that scope; the "marker"
transaction is then rolled back using :meth:`.Transaction.rollback` or closed
using :meth:`.Transaction.close`, however the outer transaction is still
present in an "inactive" state and must be rolled back.
The pattern looks like::
engine = create_engine(...)
connection = engine.connect()
transaction = connection.begin()
transaction1 = connection.begin()
# this is a "sub" or "marker" transaction, a logical nesting
# structure based on "real" transaction transaction1
transaction2 = connection.begin()
transaction2.rollback()
connection.execute(text("select 1")) # we are rolled back; will now raise
# transaction1 is still present and needs explicit rollback,
# so this will raise
connection.execute(text("select 1"))
transaction.rollback()
Above, ``transaction2`` is a "marker" transaction, which indicates a logical
nesting of transactions within an outer one; while the inner transaction
can roll back the whole transaction via its rollback() method, its commit()
method has no effect except to close the scope of the "marker" transaction
itself. The call to ``transaction2.rollback()`` has the effect of
**deactivating** transaction1 which means it is essentially rolled back
at the database level, however is still present in order to accommodate
a consistent nesting pattern of transactions.
The correct resolution is to ensure the outer transaction is also
rolled back::
Above, ``transaction2`` is a "sub" transaction, which indicates a logical
nesting of transactions within an outer one. SQLAlchemy makes great use of
this pattern more commonly in the ORM :class:`.Session`, where the FAQ entry
:ref:`faq_session_rollback` describes the rationale within the ORM.
transaction1.rollback()
The "subtransaction" pattern in Core comes into play often when using the ORM
pattern described at :ref:`session_external_transaction`. As this pattern
involves a behavior called "connection branching", where a :class:`_engine.Connection`
serves a "branched" :class:`_engine.Connection` object to the :class:`.Session` via
its :meth:`_engine.Connection.connect` method, the same transaction behavior comes
into play; if the :class:`.Session` rolls back the transaction, and savepoints
have not been used to prevent a rollback of the entire transaction, the
outermost transaction started on the :class:`_engine.Connection` is now in an inactive
state.
This pattern is not commonly used in Core. Within the ORM, a similar issue can
occur which is the product of the ORM's "logical" transaction structure; this
is described in the FAQ entry at :ref:`faq_session_rollback`.
The "subtransaction" pattern is to be removed in SQLAlchemy 2.0 so that this
particular programming pattern will no longer be available and this
error message will no longer occur in Core.
.. _error_dbapi:
File diff suppressed because it is too large Load Diff
+8 -1
View File
@@ -96,8 +96,15 @@ class CursorResultMetaData(ResultMetaData):
}
)
# TODO: need unit test for:
# result = connection.execute("raw sql, no columns").scalars()
# without the "or ()" it's failing because MD_OBJECTS is None
new_metadata._keymap.update(
{e: new_rec for new_rec in new_recs for e in new_rec[MD_OBJECTS]}
{
e: new_rec
for new_rec in new_recs
for e in new_rec[MD_OBJECTS] or ()
}
)
return new_metadata
+4 -2
View File
@@ -640,18 +640,20 @@ class ConnectionEvents(event.Events):
:param conn: :class:`_engine.Connection` object
:param name: specified name used for the savepoint.
:param context: :class:`.ExecutionContext` in use. May be ``None``.
:param context: not used
"""
# TODO: deprecate "context"
def release_savepoint(self, conn, name, context):
"""Intercept release_savepoint() events.
:param conn: :class:`_engine.Connection` object
:param name: specified name used for the savepoint.
:param context: :class:`.ExecutionContext` in use. May be ``None``.
:param context: not used
"""
# TODO: deprecate "context"
def begin_twophase(self, conn, xid):
"""Intercept begin_twophase() events.
+9
View File
@@ -225,6 +225,15 @@ class NoInspectionAvailable(InvalidRequestError):
no context for inspection."""
class PendingRollbackError(InvalidRequestError):
"""A transaction has failed and needs to be rolled back before
continuing.
.. versionadded:: 1.4
"""
class ResourceClosedError(InvalidRequestError):
"""An operation was requested from a connection, cursor, or other
object that's in a closed state."""
+1 -19
View File
@@ -249,25 +249,7 @@ class Connection(_LegacyConnection):
if any transaction is in place.
"""
try:
conn = self.__connection
except AttributeError:
pass
else:
# TODO: can we do away with "_reset_agent" stuff now?
if self._transaction:
self._transaction.rollback()
conn.close()
# the close() process can end up invalidating us,
# as the pool will call our transaction as the "reset_agent"
# for rollback(), which can then cause an invalidation
if not self.__invalid:
del self.__connection
self.__can_reconnect = False
self._transaction = None
super(Connection, self).close()
def execute(self, statement, parameters=None, execution_options=None):
r"""Executes a SQL statement construct and returns a
+1 -1
View File
@@ -291,7 +291,7 @@ class SessionTransaction(object):
elif self._state is DEACTIVE:
if not deactive_ok and not rollback_ok:
if self._rollback_exception:
raise sa_exc.InvalidRequestError(
raise sa_exc.PendingRollbackError(
"This Session's transaction has been rolled back "
"due to a previous exception during flush."
" To begin a new transaction with this Session, "
+4 -6
View File
@@ -285,13 +285,11 @@ def _assert_proper_exception_context(exception):
def assert_raises(except_cls, callable_, *args, **kw):
_assert_raises(except_cls, callable_, args, kw, check_context=True)
return _assert_raises(except_cls, callable_, args, kw, check_context=True)
def assert_raises_context_ok(except_cls, callable_, *args, **kw):
_assert_raises(
except_cls, callable_, args, kw,
)
return _assert_raises(except_cls, callable_, args, kw,)
def assert_raises_return(except_cls, callable_, *args, **kw):
@@ -299,7 +297,7 @@ def assert_raises_return(except_cls, callable_, *args, **kw):
def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
_assert_raises(
return _assert_raises(
except_cls, callable_, args, kwargs, msg=msg, check_context=True
)
@@ -307,7 +305,7 @@ def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
def assert_raises_message_context_ok(
except_cls, msg, callable_, *args, **kwargs
):
_assert_raises(except_cls, callable_, args, kwargs, msg=msg)
return _assert_raises(except_cls, callable_, args, kwargs, msg=msg)
def _assert_raises(
@@ -412,6 +412,17 @@ def _prep_testing_database(options, file_config):
if options.dropfirst:
for cfg in config.Config.all_configs():
e = cfg.db
# TODO: this has to be part of provision.py in postgresql
if against(cfg, "postgresql"):
with e.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as conn:
for xid in conn.execute(
"select gid from pg_prepared_xacts"
).scalars():
conn.execute("ROLLBACK PREPARED '%s'" % xid)
inspector = inspect(e)
try:
view_names = inspector.get_view_names()
@@ -447,6 +458,7 @@ def _prep_testing_database(options, file_config):
if config.requirements.schemas.enabled_for_config(cfg):
util.drop_all_tables(e, inspector, schema=cfg.test_schema)
# TODO: this has to be part of provision.py in postgresql
if against(cfg, "postgresql"):
from sqlalchemy.dialects import postgresql
+179 -14
View File
@@ -103,8 +103,21 @@ def mock_connection():
else:
return
def commit():
if conn.explode == "commit":
raise MockDisconnect("Lost the DB connection on commit")
elif conn.explode == "commit_no_disconnect":
raise MockError(
"something broke on commit but we didn't lose the "
"connection"
)
else:
return
conn = Mock(
rollback=Mock(side_effect=rollback), cursor=Mock(side_effect=cursor())
rollback=Mock(side_effect=rollback),
commit=Mock(side_effect=commit),
cursor=Mock(side_effect=cursor()),
)
return conn
@@ -420,7 +433,7 @@ class MockReconnectTest(fixtures.TestBase):
[[call()], [call()], []],
)
def test_invalidate_trans(self):
def test_invalidate_on_execute_trans(self):
conn = self.db.connect()
trans = conn.begin()
self.dbapi.shutdown()
@@ -432,7 +445,7 @@ class MockReconnectTest(fixtures.TestBase):
assert conn.invalidated
assert trans.is_active
assert_raises_message(
tsa.exc.StatementError,
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
@@ -440,12 +453,30 @@ class MockReconnectTest(fixtures.TestBase):
assert trans.is_active
assert_raises_message(
tsa.exc.InvalidRequestError,
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
trans.commit,
)
# now it's inactive...
assert not trans.is_active
# but still associated with the connection
assert_raises_message(
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
)
assert not trans.is_active
# still can't commit... error stays the same
assert_raises_message(
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
trans.commit,
)
assert trans.is_active
trans.rollback()
assert not trans.is_active
conn.execute(select([1]))
@@ -455,6 +486,104 @@ class MockReconnectTest(fixtures.TestBase):
[[call()], []],
)
def test_invalidate_on_commit_trans(self):
conn = self.db.connect()
trans = conn.begin()
self.dbapi.shutdown("commit")
assert_raises(tsa.exc.DBAPIError, trans.commit)
assert not conn.closed
assert conn.invalidated
assert not trans.is_active
# error stays consistent
assert_raises_message(
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
)
assert not trans.is_active
assert_raises_message(
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
trans.commit,
)
assert not trans.is_active
assert_raises_message(
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
)
assert not trans.is_active
trans.rollback()
assert not trans.is_active
conn.execute(select([1]))
assert not conn.invalidated
def test_commit_fails_contextmanager(self):
# this test is also performed in test/engine/test_transaction.py
# using real connections
conn = self.db.connect()
def go():
with conn.begin():
self.dbapi.shutdown("commit_no_disconnect")
assert_raises(tsa.exc.DBAPIError, go)
assert not conn.in_transaction()
def test_commit_fails_trans(self):
# this test is also performed in test/engine/test_transaction.py
# using real connections
conn = self.db.connect()
trans = conn.begin()
self.dbapi.shutdown("commit_no_disconnect")
assert_raises(tsa.exc.DBAPIError, trans.commit)
assert not conn.closed
assert not conn.invalidated
assert not trans.is_active
# error stays consistent
assert_raises_message(
tsa.exc.PendingRollbackError,
"This connection is on an inactive transaction. Please rollback",
conn.execute,
select([1]),
)
assert not trans.is_active
assert_raises_message(
tsa.exc.PendingRollbackError,
"This connection is on an inactive transaction. Please rollback",
trans.commit,
)
assert not trans.is_active
assert_raises_message(
tsa.exc.PendingRollbackError,
"This connection is on an inactive transaction. Please rollback",
conn.execute,
select([1]),
)
assert not trans.is_active
trans.rollback()
assert not trans.is_active
conn.execute(select([1]))
assert not conn.invalidated
def test_invalidate_dont_call_finalizer(self):
conn = self.db.connect()
finalizer = mock.Mock()
@@ -497,9 +626,9 @@ class MockReconnectTest(fixtures.TestBase):
conn.close()
assert conn.closed
assert conn.invalidated
assert not conn.invalidated
assert_raises_message(
tsa.exc.StatementError,
tsa.exc.ResourceClosedError,
"This Connection is closed",
conn.execute,
select([1]),
@@ -544,7 +673,7 @@ class MockReconnectTest(fixtures.TestBase):
assert not conn.invalidated
assert_raises_message(
tsa.exc.StatementError,
tsa.exc.ResourceClosedError,
"This Connection is closed",
conn.execute,
select([1]),
@@ -594,10 +723,10 @@ class MockReconnectTest(fixtures.TestBase):
)
assert conn.closed
assert conn.invalidated
assert not conn.invalidated
assert_raises_message(
tsa.exc.StatementError,
tsa.exc.ResourceClosedError,
"This Connection is closed",
conn.execute,
select([1]),
@@ -955,7 +1084,7 @@ class RealReconnectTest(fixtures.TestBase):
_assert_invalidated(c1_branch.execute, select([1]))
assert not c1_branch.closed
assert not c1_branch._connection_is_valid
assert not c1_branch._still_open_and_dbapi_connection_is_valid
def test_ensure_is_disconnect_gets_connection(self):
def is_disconnect(e, conn, cursor):
@@ -1062,6 +1191,7 @@ class RealReconnectTest(fixtures.TestBase):
def test_with_transaction(self):
conn = self.engine.connect()
trans = conn.begin()
assert trans.is_valid
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.closed
self.engine.test_shutdown()
@@ -1069,21 +1199,56 @@ class RealReconnectTest(fixtures.TestBase):
assert not conn.closed
assert conn.invalidated
assert trans.is_active
assert not trans.is_valid
assert_raises_message(
tsa.exc.StatementError,
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
)
assert trans.is_active
assert not trans.is_valid
assert_raises_message(
tsa.exc.InvalidRequestError,
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
trans.commit,
)
assert trans.is_active
# becomes inactive
assert not trans.is_active
assert not trans.is_valid
# still asks us to rollback
assert_raises_message(
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
)
# still asks us..
assert_raises_message(
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
trans.commit,
)
# still...it's being consistent in what it is asking.
assert_raises_message(
tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
)
# OK!
trans.rollback()
assert not trans.is_active
assert not trans.is_valid
# conn still invalid but we can reconnect
assert conn.invalidated
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
+440 -223
View File
@@ -11,8 +11,10 @@ from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import util
from sqlalchemy import VARCHAR
from sqlalchemy.future import select as future_select
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
@@ -49,8 +51,13 @@ class TransactionTest(fixtures.TestBase):
def teardown_class(cls):
users.drop(testing.db)
def test_commits(self):
connection = testing.db.connect()
@testing.fixture
def local_connection(self):
with testing.db.connect() as conn:
yield conn
def test_commits(self, local_connection):
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction.commit()
@@ -66,10 +73,10 @@ class TransactionTest(fixtures.TestBase):
transaction.commit()
connection.close()
def test_rollback(self):
def test_rollback(self, local_connection):
"""test a basic rollback"""
connection = testing.db.connect()
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -77,10 +84,9 @@ class TransactionTest(fixtures.TestBase):
transaction.rollback()
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
def test_raise(self):
connection = testing.db.connect()
def test_raise(self, local_connection):
connection = local_connection
transaction = connection.begin()
try:
@@ -95,10 +101,9 @@ class TransactionTest(fixtures.TestBase):
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
def test_nested_rollback(self):
connection = testing.db.connect()
def test_nested_rollback(self, local_connection):
connection = local_connection
try:
transaction = connection.begin()
try:
@@ -129,176 +134,338 @@ class TransactionTest(fixtures.TestBase):
transaction.rollback()
raise
except Exception as e:
try:
# and not "This transaction is inactive"
# comment moved here to fix pep8
assert str(e) == "uh oh"
finally:
connection.close()
# and not "This transaction is inactive"
# comment moved here to fix pep8
assert str(e) == "uh oh"
else:
assert False
def test_branch_nested_rollback(self):
connection = testing.db.connect()
try:
connection.begin()
branched = connection.connect()
assert branched.in_transaction()
branched.execute(users.insert(), user_id=1, user_name="user1")
nested = branched.begin()
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
assert not connection.in_transaction()
def test_branch_nested_rollback(self, local_connection):
connection = local_connection
connection.begin()
branched = connection.connect()
assert branched.in_transaction()
branched.execute(users.insert(), user_id=1, user_name="user1")
nested = branched.begin()
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
assert not connection.in_transaction()
assert_raises_message(
exc.InvalidRequestError,
"This connection is on an inactive transaction. Please",
connection.exec_driver_sql,
"select 1",
assert_raises_message(
exc.InvalidRequestError,
"This connection is on an inactive transaction. Please",
connection.exec_driver_sql,
"select 1",
)
def test_no_marker_on_inactive_trans(self, local_connection):
conn = local_connection
conn.begin()
mk1 = conn.begin()
mk1.rollback()
assert_raises_message(
exc.InvalidRequestError,
"the current transaction on this connection is inactive.",
conn.begin,
)
@testing.requires.savepoints
def test_savepoint_cancelled_by_toplevel_marker(self, local_connection):
conn = local_connection
trans = conn.begin()
conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
mk1 = conn.begin()
sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
mk1.rollback()
assert not sp1.is_active
assert not trans.is_active
assert conn._transaction is trans
assert conn._nested_transaction is None
with testing.db.connect() as conn:
eq_(
conn.scalar(future_select(func.count(1)).select_from(users)),
0,
)
finally:
connection.close()
def test_inactive_due_to_subtransaction_no_commit(self):
connection = testing.db.connect()
def test_inactive_due_to_subtransaction_no_commit(self, local_connection):
connection = local_connection
trans = connection.begin()
trans2 = connection.begin()
trans2.rollback()
assert_raises_message(
exc.InvalidRequestError,
"This connection is on an inactive transaction. Please rollback",
trans.commit,
)
trans.rollback()
assert_raises_message(
exc.InvalidRequestError,
"This transaction is inactive",
trans.commit,
)
def test_branch_autorollback(self):
connection = testing.db.connect()
try:
branched = connection.connect()
branched.execute(users.insert(), user_id=1, user_name="user1")
try:
branched.execute(users.insert(), user_id=1, user_name="user1")
except exc.DBAPIError:
pass
finally:
connection.close()
@testing.requires.savepoints
def test_inactive_due_to_subtransaction_on_nested_no_commit(
self, local_connection
):
connection = local_connection
trans = connection.begin()
def test_branch_orig_rollback(self):
connection = testing.db.connect()
try:
nested = connection.begin_nested()
trans2 = connection.begin()
trans2.rollback()
assert_raises_message(
exc.InvalidRequestError,
"This connection is on an inactive savepoint transaction. "
"Please rollback",
nested.commit,
)
trans.commit()
assert_raises_message(
exc.InvalidRequestError,
"This nested transaction is inactive",
nested.commit,
)
def test_branch_autorollback(self, local_connection):
connection = local_connection
branched = connection.connect()
branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
assert_raises(
exc.DBAPIError,
branched.execute,
users.insert(),
dict(user_id=1, user_name="user1"),
)
# can continue w/o issue
branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
def test_branch_orig_rollback(self, local_connection):
connection = local_connection
branched = connection.connect()
branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
nested = branched.begin()
assert branched.in_transaction()
branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
nested.rollback()
eq_(
connection.exec_driver_sql(
"select count(*) from query_users"
).scalar(),
1,
)
@testing.requires.independent_connections
def test_branch_autocommit(self, local_connection):
with testing.db.connect() as connection:
branched = connection.connect()
branched.execute(users.insert(), user_id=1, user_name="user1")
nested = branched.begin()
assert branched.in_transaction()
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
eq_(
connection.exec_driver_sql(
"select count(*) from query_users"
).scalar(),
1,
branched.execute(
users.insert(), dict(user_id=1, user_name="user1")
)
finally:
connection.close()
def test_branch_autocommit(self):
connection = testing.db.connect()
try:
branched = connection.connect()
branched.execute(users.insert(), user_id=1, user_name="user1")
finally:
connection.close()
eq_(
testing.db.execute(
local_connection.execute(
text("select count(*) from query_users")
).scalar(),
1,
)
@testing.requires.savepoints
def test_branch_savepoint_rollback(self):
connection = testing.db.connect()
try:
trans = connection.begin()
branched = connection.connect()
assert branched.in_transaction()
branched.execute(users.insert(), user_id=1, user_name="user1")
nested = branched.begin_nested()
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
assert connection.in_transaction()
trans.commit()
eq_(
connection.exec_driver_sql(
"select count(*) from query_users"
).scalar(),
1,
)
finally:
connection.close()
def test_branch_savepoint_rollback(self, local_connection):
connection = local_connection
trans = connection.begin()
branched = connection.connect()
assert branched.in_transaction()
branched.execute(users.insert(), user_id=1, user_name="user1")
nested = branched.begin_nested()
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
assert connection.in_transaction()
trans.commit()
eq_(
connection.exec_driver_sql(
"select count(*) from query_users"
).scalar(),
1,
)
@testing.requires.two_phase_transactions
def test_branch_twophase_rollback(self):
connection = testing.db.connect()
try:
branched = connection.connect()
assert not branched.in_transaction()
branched.execute(users.insert(), user_id=1, user_name="user1")
nested = branched.begin_twophase()
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
assert not connection.in_transaction()
eq_(
connection.exec_driver_sql(
"select count(*) from query_users"
).scalar(),
1,
def test_branch_twophase_rollback(self, local_connection):
connection = local_connection
branched = connection.connect()
assert not branched.in_transaction()
branched.execute(users.insert(), user_id=1, user_name="user1")
nested = branched.begin_twophase()
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
assert not connection.in_transaction()
eq_(
connection.exec_driver_sql(
"select count(*) from query_users"
).scalar(),
1,
)
def test_commit_fails_flat(self, local_connection):
connection = local_connection
t1 = connection.begin()
with mock.patch.object(
connection,
"_commit_impl",
mock.Mock(side_effect=exc.DBAPIError("failure", None, None, None)),
):
assert_raises_message(exc.DBAPIError, r"failure", t1.commit)
assert not t1.is_active
t1.rollback() # no error
def test_commit_fails_ctxmanager(self, local_connection):
connection = local_connection
transaction = [None]
def go():
with mock.patch.object(
connection,
"_commit_impl",
mock.Mock(
side_effect=exc.DBAPIError("failure", None, None, None)
),
):
with connection.begin() as t1:
transaction[0] = t1
assert_raises_message(exc.DBAPIError, r"failure", go)
t1 = transaction[0]
assert not t1.is_active
t1.rollback() # no error
@testing.requires.savepoints_w_release
def test_savepoint_rollback_fails_flat(self, local_connection):
connection = local_connection
t1 = connection.begin()
s1 = connection.begin_nested()
# force the "commit" of the savepoint that occurs
# when the "with" block fails, e.g.
# the RELEASE, to fail, because the savepoint is already
# released.
connection.dialect.do_release_savepoint(connection, s1._savepoint)
assert_raises_message(
exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", s1.rollback
)
assert not s1.is_active
with testing.expect_warnings("nested transaction already"):
s1.rollback() # no error (though it warns)
t1.commit() # no error
@testing.requires.savepoints_w_release
def test_savepoint_release_fails_flat(self):
with testing.db.connect() as connection:
t1 = connection.begin()
s1 = connection.begin_nested()
# force the "commit" of the savepoint that occurs
# when the "with" block fails, e.g.
# the RELEASE, to fail, because the savepoint is already
# released.
connection.dialect.do_release_savepoint(connection, s1._savepoint)
assert_raises_message(
exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", s1.commit
)
finally:
connection.close()
assert not s1.is_active
s1.rollback() # no error. prior to 1.4 this would try to rollback
t1.commit() # no error
@testing.requires.python2
@testing.requires.savepoints_w_release
def test_savepoint_release_fails_warning(self):
with testing.db.connect() as connection:
connection.begin()
def test_savepoint_release_fails_ctxmanager(self, local_connection):
connection = local_connection
connection.begin()
with expect_warnings(
"An exception has occurred during handling of a previous "
"exception. The previous exception "
r"is:.*..SQL\:.*RELEASE SAVEPOINT"
):
savepoint = [None]
def go():
with connection.begin_nested() as savepoint:
connection.dialect.do_release_savepoint(
connection, savepoint._savepoint
)
def go():
assert_raises_message(
exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", go
with connection.begin_nested() as sp:
savepoint[0] = sp
# force the "commit" of the savepoint that occurs
# when the "with" block fails, e.g.
# the RELEASE, to fail, because the savepoint is already
# released.
connection.dialect.do_release_savepoint(
connection, sp._savepoint
)
def test_retains_through_options(self):
connection = testing.db.connect()
try:
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
conn2 = connection.execution_options(dummy=True)
conn2.execute(users.insert(), user_id=2, user_name="user2")
transaction.rollback()
eq_(
connection.exec_driver_sql(
"select count(*) from query_users"
).scalar(),
0,
)
finally:
connection.close()
# prior to SQLAlchemy 1.4, the above release would fail
# and then the savepoint would try to rollback, and that failed
# also, causing a long exception chain that under Python 2
# was particularly hard to diagnose, leading to issue
# #2696 which eventually impacted Openstack, and we
# had to add warnings that show what the "context" for an
# exception was. The SQL for the exception was
# ROLLBACK TO SAVEPOINT, and up the exception chain would be
# the RELEASE failing.
#
# now, when the savepoint "commit" fails, it sets itself as
# inactive. so it does not try to rollback and it cleans
# itself out appropriately.
#
def test_nesting(self):
connection = testing.db.connect()
exc_ = assert_raises_message(
exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", go
)
savepoint = savepoint[0]
assert not savepoint.is_active
if util.py3k:
# driver error
assert exc_.__cause__
# and that's it, no other context
assert not exc_.__cause__.__context__
def test_retains_through_options(self, local_connection):
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
conn2 = connection.execution_options(dummy=True)
conn2.execute(users.insert(), user_id=2, user_name="user2")
transaction.rollback()
eq_(
connection.exec_driver_sql(
"select count(*) from query_users"
).scalar(),
0,
)
def test_nesting(self, local_connection):
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -316,10 +483,9 @@ class TransactionTest(fixtures.TestBase):
)
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
def test_with_interface(self):
connection = testing.db.connect()
def test_with_interface(self, local_connection):
connection = local_connection
trans = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -346,10 +512,9 @@ class TransactionTest(fixtures.TestBase):
).scalar()
== 1
)
connection.close()
def test_close(self):
connection = testing.db.connect()
def test_close(self, local_connection):
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -370,10 +535,9 @@ class TransactionTest(fixtures.TestBase):
)
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 5
connection.close()
def test_close2(self):
connection = testing.db.connect()
def test_close2(self, local_connection):
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -394,11 +558,10 @@ class TransactionTest(fixtures.TestBase):
)
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@testing.requires.savepoints
def test_nested_subtransaction_rollback(self):
connection = testing.db.connect()
def test_nested_subtransaction_rollback(self, local_connection):
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -412,11 +575,10 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (3,)],
)
connection.close()
@testing.requires.savepoints
def test_nested_subtransaction_commit(self):
connection = testing.db.connect()
def test_nested_subtransaction_commit(self, local_connection):
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -430,11 +592,10 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (2,), (3,)],
)
connection.close()
@testing.requires.savepoints
def test_rollback_to_subtransaction(self):
connection = testing.db.connect()
def test_rollback_to_subtransaction(self, local_connection):
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -451,6 +612,7 @@ class TransactionTest(fixtures.TestBase):
"select 1",
)
trans2.rollback()
assert connection._nested_transaction is None
connection.execute(users.insert(), user_id=4, user_name="user4")
transaction.commit()
@@ -460,11 +622,10 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (4,)],
)
connection.close()
@testing.requires.two_phase_transactions
def test_two_phase_transaction(self):
connection = testing.db.connect()
def test_two_phase_transaction(self, local_connection):
connection = local_connection
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction.prepare()
@@ -487,7 +648,6 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (2,)],
)
connection.close()
# PG emergency shutdown:
# select * from pg_prepared_xacts
@@ -495,12 +655,11 @@ class TransactionTest(fixtures.TestBase):
# MySQL emergency shutdown:
# for arg in `mysql -u root -e "xa recover" | cut -c 8-100 |
# grep sa`; do mysql -u root -e "xa rollback '$arg'"; done
@testing.crashes("mysql", "Crashing on 5.5, not worth it")
@testing.requires.skip_mysql_on_windows
@testing.requires.two_phase_transactions
@testing.requires.savepoints
def test_mixed_two_phase_transaction(self):
connection = testing.db.connect()
def test_mixed_two_phase_transaction(self, local_connection):
connection = local_connection
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction2 = connection.begin()
@@ -521,44 +680,46 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (2,), (5,)],
)
connection.close()
@testing.requires.two_phase_transactions
@testing.requires.two_phase_recovery
def test_two_phase_recover(self):
# MySQL recovery doesn't currently seem to work correctly
# Prepared transactions disappear when connections are closed
# and even when they aren't it doesn't seem possible to use the
# recovery id.
# 2020, still can't get this to work w/ modern MySQL or MariaDB.
# the XA RECOVER comes back as bytes, OK, convert to string,
# XA COMMIT then says Unknown XID. Also, the drivers seem to be
# killing off the XID if I use the connection.invalidate() before
# trying to access in another connection. Not really worth it
# unless someone wants to step through how mysqlclient / pymysql
# support this correctly.
connection = testing.db.connect()
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
transaction.prepare()
connection.invalidate()
connection2 = testing.db.connect()
eq_(
connection2.execution_options(autocommit=True)
.execute(select([users.c.user_id]).order_by(users.c.user_id))
.fetchall(),
[],
)
recoverables = connection2.recover_twophase()
assert transaction.xid in recoverables
connection2.commit_prepared(transaction.xid, recover=True)
eq_(
connection2.execute(
select([users.c.user_id]).order_by(users.c.user_id)
).fetchall(),
[(1,)],
)
connection2.close()
with testing.db.connect() as connection2:
eq_(
connection2.execution_options(autocommit=True)
.execute(select([users.c.user_id]).order_by(users.c.user_id))
.fetchall(),
[],
)
recoverables = connection2.recover_twophase()
assert transaction.xid in recoverables
connection2.commit_prepared(transaction.xid, recover=True)
eq_(
connection2.execute(
select([users.c.user_id]).order_by(users.c.user_id)
).fetchall(),
[(1,)],
)
@testing.requires.two_phase_transactions
def test_multiple_two_phase(self):
conn = testing.db.connect()
def test_multiple_two_phase(self, local_connection):
conn = local_connection
xa = conn.begin_twophase()
conn.execute(users.insert(), user_id=1, user_name="user1")
xa.prepare()
@@ -578,7 +739,6 @@ class TransactionTest(fixtures.TestBase):
select([users.c.user_name]).order_by(users.c.user_id)
)
eq_(result.fetchall(), [("user1",), ("user4",)])
conn.close()
@testing.requires.two_phase_transactions
def test_reset_rollback_two_phase_no_rollback(self):
@@ -652,7 +812,7 @@ class ResetAgentTest(fixtures.TestBase):
with expect_warnings("Reset agent is not active"):
conn.close()
def test_trans_commit_reset_agent_broken_ensure(self):
def test_trans_commit_reset_agent_broken_ensure_pool(self):
eng = testing_engine(options={"pool_reset_on_return": "commit"})
conn = eng.connect()
trans = conn.begin()
@@ -669,8 +829,10 @@ class ResetAgentTest(fixtures.TestBase):
assert connection.connection._reset_agent is t1
t2 = connection.begin_nested()
assert connection.connection._reset_agent is t1
assert connection._transaction is t2
assert connection._nested_transaction is t2
assert connection._transaction is t1
t2.close()
assert connection._nested_transaction is None
assert connection._transaction is t1
assert connection.connection._reset_agent is t1
t1.close()
@@ -684,10 +846,15 @@ class ResetAgentTest(fixtures.TestBase):
assert connection.connection._reset_agent is t1
t2 = connection.begin_nested()
assert connection.connection._reset_agent is t1
assert connection._transaction is t2
assert connection._nested_transaction is t2
assert connection._transaction is t1
assert connection.connection._reset_agent is t1
t1.close()
assert connection._nested_transaction is None
assert connection._transaction is None
assert connection.connection._reset_agent is None
assert not t1.is_active
@@ -698,19 +865,25 @@ class ResetAgentTest(fixtures.TestBase):
assert connection.connection._reset_agent is t1
t2 = connection.begin_nested()
assert connection.connection._reset_agent is t1
assert connection._transaction is t2
assert connection._nested_transaction is t2
assert connection._transaction is t1
t2.close()
assert connection._nested_transaction is None
assert connection._transaction is t1
assert connection.connection._reset_agent is t1
t1.rollback()
assert connection._transaction is None
assert connection.connection._reset_agent is None
assert not t2.is_active
assert not t1.is_active
@testing.requires.savepoints
def test_begin_nested_close(self):
with testing.db.connect() as connection:
trans = connection.begin_nested()
assert connection.connection._reset_agent is trans
assert (
connection.connection._reset_agent is connection._transaction
)
assert not trans.is_active
@testing.requires.savepoints
@@ -719,7 +892,7 @@ class ResetAgentTest(fixtures.TestBase):
trans = connection.begin()
trans2 = connection.begin_nested()
assert connection.connection._reset_agent is trans
assert trans2.is_active # was never closed
assert not trans2.is_active
assert not trans.is_active
@testing.requires.savepoints
@@ -1177,11 +1350,9 @@ class IsolationLevelTest(fixtures.TestBase):
class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
"""The SQLAlchemy 2.0 Connection ensures its own transaction is rolled
back upon close. Therefore the whole "reset agent" thing can go away.
this suite runs through all the reset agent tests to ensure the state
of the transaction is maintained while the "reset agent" feature is not
needed at all.
"""Still some debate over if the "reset agent" should apply to the
future connection or not.
"""
@@ -1192,7 +1363,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
with testing.db.connect() as connection:
event.listen(connection, "rollback", canary)
trans = connection.begin()
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
assert not trans.is_active
eq_(canary.mock_calls, [mock.call(connection)])
@@ -1201,7 +1373,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
with testing.db.connect() as connection:
event.listen(connection, "rollback", canary)
trans = connection.begin()
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
trans.rollback()
assert connection.connection._reset_agent is None
assert not trans.is_active
@@ -1213,7 +1385,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "rollback", canary.rollback)
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
trans.commit()
assert connection.connection._reset_agent is None
assert not trans.is_active
@@ -1226,8 +1398,11 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "rollback", canary.rollback)
event.listen(connection, "commit", canary.commit)
trans = connection.begin_nested()
assert connection.connection._reset_agent is None
assert trans.is_active # it's a savepoint
assert (
connection.connection._reset_agent is connection._transaction
)
# it's a savepoint, but root made sure it closed
assert not trans.is_active
eq_(canary.mock_calls, [mock.call.rollback(connection)])
@testing.requires.savepoints
@@ -1238,8 +1413,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
trans2 = connection.begin_nested()
assert connection.connection._reset_agent is None
assert trans2.is_active # was never closed
assert connection.connection._reset_agent is trans
assert not trans2.is_active
assert not trans.is_active
eq_(canary.mock_calls, [mock.call.rollback(connection)])
@@ -1254,15 +1429,15 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
trans2 = connection.begin_nested()
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
trans2.rollback() # this is not a connection level event
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
trans.commit()
assert connection.connection._reset_agent is None
eq_(
canary.mock_calls,
[
mock.call.rollback_savepoint(connection, mock.ANY, trans),
mock.call.rollback_savepoint(connection, mock.ANY, None),
mock.call.commit(connection),
],
)
@@ -1275,9 +1450,9 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
trans2 = connection.begin_nested()
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
trans2.rollback()
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
trans.rollback()
assert connection.connection._reset_agent is None
eq_(canary.mock_calls, [mock.call.rollback(connection)])
@@ -1292,7 +1467,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
)
event.listen(connection, "commit", canary.commit)
trans = connection.begin_twophase()
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
assert not trans.is_active
eq_(
canary.mock_calls,
@@ -1307,7 +1482,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
event.listen(connection, "commit_twophase", canary.commit_twophase)
trans = connection.begin_twophase()
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
trans.commit()
assert connection.connection._reset_agent is None
eq_(
@@ -1325,7 +1500,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
)
event.listen(connection, "commit", canary.commit)
trans = connection.begin_twophase()
assert connection.connection._reset_agent is None
assert connection.connection._reset_agent is trans
trans.rollback()
assert connection.connection._reset_agent is None
eq_(
@@ -1520,7 +1695,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
conn.invalidate()
assert_raises_message(
exc.StatementError,
exc.PendingRollbackError,
"Can't reconnect",
conn.execute,
select([1]),
@@ -1672,7 +1847,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
with testing.db.begin() as conn:
conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
conn.begin_nested()
sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
sp2 = conn.begin_nested()
@@ -1680,8 +1855,12 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
sp2.rollback()
assert not sp2.is_active
assert sp1.is_active
assert conn.in_transaction()
assert not sp1.is_active
with testing.db.connect() as conn:
eq_(
conn.scalar(future_select(func.count(1)).select_from(users)),
@@ -1721,13 +1900,21 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
assert conn._nested_transaction is sp1
sp2 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
assert conn._nested_transaction is sp2
sp2.commit()
assert conn._nested_transaction is sp1
sp1.rollback()
assert conn._nested_transaction is None
assert conn.in_transaction()
with testing.db.connect() as conn:
@@ -1735,3 +1922,33 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
conn.scalar(future_select(func.count(1)).select_from(users)),
1,
)
@testing.requires.savepoints
def test_savepoint_seven(self):
users = self.tables.users
conn = testing.db.connect()
trans = conn.begin()
conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
sp2 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
assert conn.in_transaction()
trans.close()
assert not sp1.is_active
assert not sp2.is_active
assert not trans.is_active
assert conn._transaction is None
assert conn._nested_transaction is None
with testing.db.connect() as conn:
eq_(
conn.scalar(future_select(func.count(1)).select_from(users)),
0,
)
+22 -5
View File
@@ -367,13 +367,25 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
sess.add(u)
sess.flush()
c1 = sess.connection(User)
dbapi_conn = c1.connection
assert dbapi_conn.is_valid
sess.invalidate()
assert c1.invalidated
# Connection object is closed
assert c1.closed
# "invalidated" is not part of "closed" state
assert not c1.invalidated
# but the DBAPI conn (really ConnectionFairy)
# is invalidated
assert not dbapi_conn.is_valid
eq_(sess.query(User).all(), [])
c2 = sess.connection(User)
assert not c2.invalidated
assert c2.connection.is_valid
def test_subtransaction_on_noautocommit(self):
User, users = self.classes.User, self.tables.users
@@ -859,7 +871,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
except Exception:
trans2.rollback(_capture_exception=True)
assert_raises_message(
sa_exc.InvalidRequestError,
sa_exc.PendingRollbackError,
r"This Session's transaction has been rolled back due to a "
r"previous exception during flush. To begin a new transaction "
r"with this Session, first issue Session.rollback\(\). "
@@ -1001,7 +1013,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
for i in range(5):
assert_raises_message(
sa_exc.InvalidRequestError,
sa_exc.PendingRollbackError,
"^This Session's transaction has been "
r"rolled back due to a previous exception "
"during flush. To "
@@ -1037,7 +1049,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
with expect_warnings(".*during handling of a previous exception.*"):
session.begin_nested()
savepoint = session.connection()._transaction._savepoint
savepoint = session.connection()._nested_transaction._savepoint
# force the savepoint to disappear
session.connection().dialect.do_release_savepoint(
@@ -1708,7 +1720,12 @@ class SavepointTest(_LocalFixture):
nested_trans._do_commit()
is_(s.transaction, trans)
assert_raises(sa_exc.DBAPIError, s.rollback)
with expect_warnings("nested transaction already deassociated"):
# this previously would raise
# "savepoint "sa_savepoint_1" does not exist", however as of
# #5327 the savepoint already knows it's inactive
s.rollback()
assert u1 not in s.new
+12 -10
View File
@@ -720,7 +720,7 @@ class DefaultRequirements(SuiteRequirements):
def pg_prepared_transaction(config):
if not against(config, "postgresql"):
return False
return True
with config.db.connect() as conn:
try:
@@ -742,20 +742,20 @@ class DefaultRequirements(SuiteRequirements):
no_support(
"oracle", "two-phase xact not implemented in SQLA/oracle"
),
no_support(
"drizzle", "two-phase xact not supported by database"
),
no_support(
"sqlite", "two-phase xact not supported by database"
),
no_support(
"sybase", "two-phase xact not supported by drivers/SQLA"
),
no_support(
"mysql",
"recent MySQL communiity editions have too many issues "
"(late 2016), disabling for now",
),
# in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
# we are evaluating which modern MySQL / MariaDB versions
# can handle two-phase testing without too many problems
# no_support(
# "mysql",
# "recent MySQL communiity editions have too many issues "
# "(late 2016), disabling for now",
# ),
NotPredicate(
LambdaPredicate(
pg_prepared_transaction,
@@ -768,7 +768,9 @@ class DefaultRequirements(SuiteRequirements):
@property
def two_phase_recovery(self):
return self.two_phase_transactions + (
skip_if("mysql", "crashes on most mariadb and mysql versions")
skip_if(
"mysql", "still can't get recover to work w/ MariaDB / MySQL"
)
)
@property