- add some more transaction states so that we deliver a more accurate

message for [ticket:2662]; after_commit() is called within "committed"
state, not prepared, and no SQL can be emitted for prepared or committed
- consolidate state assertions in session transaction, use just one
method
- add more unit tests for these assertions
This commit is contained in:
Mike Bayer
2013-03-04 12:50:11 -05:00
parent f4679ea26c
commit 029b79052c
2 changed files with 110 additions and 40 deletions
+44 -36
View File
@@ -59,7 +59,9 @@ class _SessionClassMethods(object):
ACTIVE = util.symbol('ACTIVE')
PREPARED = util.symbol('PREPARED')
COMMITTED = util.symbol('COMMITTED')
DEACTIVE = util.symbol('DEACTIVE')
CLOSED = util.symbol('CLOSED')
class SessionTransaction(object):
"""A :class:`.Session`-level transaction.
@@ -164,46 +166,51 @@ class SessionTransaction(object):
def is_active(self):
return self.session is not None and self._state is ACTIVE
def _assert_is_active(self):
self._assert_is_open()
if self._state is PREPARED:
def _assert_active(self, prepared_ok=False,
rollback_ok=False,
closed_msg="This transaction is closed"):
if self._state is COMMITTED:
raise sa_exc.InvalidRequestError(
"This session is in 'prepared' state, where no further "
"SQL can be emitted until the transaction is fully "
"committed."
"This session is in 'committed' state; no further "
"SQL can be emitted within this transaction."
)
elif self._state is DEACTIVE:
if self._rollback_exception:
elif self._state is PREPARED:
if not prepared_ok:
raise sa_exc.InvalidRequestError(
"This Session's transaction has been rolled back "
"due to a previous exception during flush."
" To begin a new transaction with this Session, "
"first issue Session.rollback()."
" Original exception was: %s"
% self._rollback_exception
)
else:
raise sa_exc.InvalidRequestError(
"This Session's transaction has been rolled back "
"by a nested rollback() call. To begin a new "
"transaction, issue Session.rollback() first."
"This session is in 'prepared' state; no further "
"SQL can be emitted within this transaction."
)
def _assert_is_open(self, error_msg="The transaction is closed"):
if self.session is None:
raise sa_exc.ResourceClosedError(error_msg)
elif self._state is DEACTIVE:
if not rollback_ok:
if self._rollback_exception:
raise sa_exc.InvalidRequestError(
"This Session's transaction has been rolled back "
"due to a previous exception during flush."
" To begin a new transaction with this Session, "
"first issue Session.rollback()."
" Original exception was: %s"
% self._rollback_exception
)
else:
raise sa_exc.InvalidRequestError(
"This Session's transaction has been rolled back "
"by a nested rollback() call. To begin a new "
"transaction, issue Session.rollback() first."
)
elif self._state is CLOSED:
raise sa_exc.ResourceClosedError(closed_msg)
@property
def _is_transaction_boundary(self):
return self.nested or not self._parent
def connection(self, bindkey, **kwargs):
self._assert_is_active()
self._assert_active()
bind = self.session.get_bind(bindkey, **kwargs)
return self._connection_for_bind(bind)
def _begin(self, nested=False):
self._assert_is_active()
self._assert_active()
return SessionTransaction(
self.session, self, nested=nested)
@@ -270,7 +277,7 @@ class SessionTransaction(object):
def _connection_for_bind(self, bind):
self._assert_is_active()
self._assert_active()
if bind in self._connections:
return self._connections[bind][0]
@@ -304,11 +311,12 @@ class SessionTransaction(object):
def prepare(self):
if self._parent is not None or not self.session.twophase:
raise sa_exc.InvalidRequestError(
"Only root two phase transactions of can be prepared")
"'twophase' mode not enabled, or not root transaction; "
"can't prepare.")
self._prepare_impl()
def _prepare_impl(self):
self._assert_is_active()
self._assert_active()
if self._parent is None or self.nested:
self.session.dispatch.before_commit(self.session)
@@ -339,7 +347,7 @@ class SessionTransaction(object):
self._state = PREPARED
def commit(self):
self._assert_is_open()
self._assert_active(prepared_ok=True)
if self._state is not PREPARED:
self._prepare_impl()
@@ -347,6 +355,7 @@ class SessionTransaction(object):
for t in set(self._connections.values()):
t[1].commit()
self._state = COMMITTED
self.session.dispatch.after_commit(self.session)
if self.session._enable_transaction_accounting:
@@ -356,7 +365,7 @@ class SessionTransaction(object):
return self._parent
def rollback(self, _capture_exception=False):
self._assert_is_open()
self._assert_active(prepared_ok=True, rollback_ok=True)
stx = self.session.transaction
if stx is not self:
@@ -375,7 +384,7 @@ class SessionTransaction(object):
sess = self.session
if self.session._enable_transaction_accounting and \
not sess._is_clean():
not sess._is_clean():
# if items were added, deleted, or mutated
# here, we need to re-restore the snapshot
util.warn(
@@ -405,13 +414,13 @@ class SessionTransaction(object):
self.session.transaction = self._parent
if self._parent is None:
for connection, transaction, autoclose in \
set(self._connections.values()):
set(self._connections.values()):
if autoclose:
connection.close()
else:
transaction.close()
self._state = DEACTIVE
self._state = CLOSED
if self.session.dispatch.after_transaction_end:
self.session.dispatch.after_transaction_end(self.session, self)
@@ -425,8 +434,7 @@ class SessionTransaction(object):
return self
def __exit__(self, type, value, traceback):
self._assert_is_open("Cannot end transaction context. The transaction "
"was closed from within the context")
self._assert_active(prepared_ok=True)
if self.session.transaction is None:
return
if type is None:
+66 -4
View File
@@ -358,18 +358,80 @@ class SessionTransactionTest(FixtureTest):
sess.begin, subtransactions=True)
sess.close()
def test_no_sql_during_prepare(self):
def test_no_sql_during_commit(self):
sess = create_session(bind=testing.db, autocommit=False)
@event.listens_for(sess, "after_commit")
def go(session):
session.execute("select 1")
assert_raises_message(sa_exc.InvalidRequestError,
"This session is in 'prepared' state, where no "
"further SQL can be emitted until the "
"transaction is fully committed.",
"This session is in 'committed' state; no further "
"SQL can be emitted within this transaction.",
sess.commit)
def test_no_sql_during_prepare(self):
sess = create_session(bind=testing.db, autocommit=False, twophase=True)
sess.prepare()
assert_raises_message(sa_exc.InvalidRequestError,
"This session is in 'prepared' state; no further "
"SQL can be emitted within this transaction.",
sess.execute, "select 1")
def test_no_prepare_wo_twophase(self):
sess = create_session(bind=testing.db, autocommit=False)
assert_raises_message(sa_exc.InvalidRequestError,
"'twophase' mode not enabled, or not root "
"transaction; can't prepare.",
sess.prepare)
def test_closed_status_check(self):
sess = create_session()
trans = sess.begin()
trans.rollback()
assert_raises_message(
sa_exc.ResourceClosedError,
"This transaction is closed",
trans.rollback
)
assert_raises_message(
sa_exc.ResourceClosedError,
"This transaction is closed",
trans.commit
)
def test_deactive_status_check(self):
sess = create_session()
trans = sess.begin()
trans2 = sess.begin(subtransactions=True)
trans2.rollback()
assert_raises_message(
sa_exc.InvalidRequestError,
"This Session's transaction has been rolled back by a nested "
"rollback\(\) call. To begin a new transaction, issue "
"Session.rollback\(\) first.",
trans.commit
)
def test_deactive_status_check_w_exception(self):
sess = create_session()
trans = sess.begin()
trans2 = sess.begin(subtransactions=True)
try:
raise Exception("test")
except:
trans2.rollback(_capture_exception=True)
assert_raises_message(
sa_exc.InvalidRequestError,
"This Session's transaction has been rolled back due to a "
"previous exception during flush. To begin a new transaction "
"with this Session, first issue Session.rollback\(\). "
"Original exception was: test",
trans.commit
)
def _inactive_flushed_session_fixture(self):
users, User = self.tables.users, self.classes.User