- parent transactions weren't started on the connection when adding a connection to a nested session transaction.

- session.transaction now always refers to the innermost active transaction, even when commit/rollback are called directly on the session transaction object.
- when preparing a two-phase transaction fails on one connection all the connections are rolled back.
- two phase transactions can now be prepared.
- session.close() didn't close all transactions when nested transactions were used.
- rollback() previously erroneously set the current transaction directly to the parent of the transaction that could be rolled back to.
- autoflush for commit() wasn't flushing for simple subtransactions.
This commit is contained in:
Ants Aasma
2008-01-20 03:22:00 +00:00
parent 4be99db15b
commit 9f366afdda
5 changed files with 295 additions and 78 deletions
+28
View File
@@ -74,6 +74,29 @@ CHANGES
down polymorphic queries
- miscellaneous tickets: [ticket:940]
- session transaction management fixes:
- fixed bug with session transaction management. Parent transactions
weren't started on the connection when adding a connection to a
nested transaction.
- session.transaction now always refers to the innermost active
transaction, even when commit/rollback are called directly on the
session transaction object.
- when preparing a two-phase transaction fails on one connection
all the connections are rolled back.
- two phase transactions can now be prepared.
- session.close() didn't close all transactions when nested
transactions were used.
- rollback() previously erroneously set the current transaction
directly to the parent of the transaction that could be rolled back
to. Now it rolls back the next transaction up that can handle it, but
sets the current transaction to it's parent and inactivates the
transactions in between. Inactive transactions can only be
rolled back or closed, any other call results in an error. In effect
this means that every begin() must be accompanied by a corresponding
commit() or rollback(), including the implicit begin() in
transactional sessions.
- autoflush for commit() wasn't flushing for simple subtransactions.
- general
- warnings are now issued as type exceptions.SAWarning.
@@ -89,6 +112,11 @@ CHANGES
allows columns to map properly to result sets even
if long-name truncation kicks in [ticket:941]
- ext
- Changed ext.activemapper to use a non-transactional session
for the objectstore as the test seem to imply that this is
the expected behavior.
0.4.2p3
------
- general
+4 -2
View File
@@ -476,7 +476,7 @@ Session also supports Python 2.5's with statement so that the example above can
item1.foo = 'bar'
item2.bar = 'foo'
For MySQL and Postgres (and soon Oracle), "nested" transactions can be accomplished which use SAVEPOINT behavior, via the `begin_nested()` method:
Subtransactions can be created by calling the `begin()` method repeatedly. For each transaction you `begin()` you must always call either `commit()` or `rollback()`. Note that this includes the implicit transaction created by the transactional session. When a subtransaction is created the current transaction of the session is set to that transaction. Commiting the subtransaction will return you to the next outer transaction. Rolling it back will also return you to the next outer transaction, but in addition it will roll back database state to the innermost transaction that supports rolling back to. Usually this means the root transaction, unless you use the nested transaction functionality via the `begin_nested()` method. MySQL and Postgres (and soon Oracle) support using "nested" transactions by creating SAVEPOINTs, :
{python}
Session = sessionmaker(transactional=False)
@@ -492,7 +492,7 @@ For MySQL and Postgres (and soon Oracle), "nested" transactions can be accomplis
sess.commit() # commits u1 and u2
Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instructed to use two-phase commit semantics using the flag `twophase=True`, which coordinates transactions across multiple databases:
Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instructed to use two-phase commit semantics. This will coordinate the commiting of transactions across databases so that the transaction is either committed or rolled back in all databases. You can also `prepare()` the session for interacting with transactions not managed by SQLAlchemy. To use two phase transactions set the flag `twophase=True` on the session:
{python}
engine1 = create_engine('postgres://db1')
@@ -511,6 +511,8 @@ Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instru
# before committing both transactions
sess.commit()
Be aware that when a crash occurs in one of the databases while the the transactions are prepared you have to manually commit or rollback the prepared transactions in your database as appropriate.
## Embedding SQL Insert/Update Expressions into a Flush {@name=flushsql}
This feature allows the value of a database column to be set to a SQL expression instead of a literal value. It's especially useful for atomic updates, calling stored procedures, etc. All you do is assign an expression to an attribute:
+1 -1
View File
@@ -13,7 +13,7 @@ import sys
#
metadata = ThreadLocalMetaData()
Objectstore = scoped_session
objectstore = scoped_session(sessionmaker(autoflush=True))
objectstore = scoped_session(sessionmaker(autoflush=True, transactional=False))
#
# declarative column declaration - this is so that we can infer the colname
+145 -74
View File
@@ -144,110 +144,164 @@ class SessionTransaction(object):
def __init__(self, session, parent=None, autoflush=True, nested=False):
self.session = session
self.__connections = {}
self.__parent = parent
self._connections = {}
self._parent = parent
self.autoflush = autoflush
self.nested = nested
self._active = True
self._prepared = False
is_active = property(lambda s: s.session is not None and s._active)
def _assert_is_active(self):
self._assert_is_open()
if not self._active:
raise exceptions.InvalidRequestError("The transaction is inactive due to a rollback in a subtransaction and should be closed")
def _assert_is_open(self):
if self.session is None:
raise exceptions.InvalidRequestError("The transaction is closed")
def connection(self, bindkey, **kwargs):
self._assert_is_active()
engine = self.session.get_bind(bindkey, **kwargs)
return self.get_or_add(engine)
def _begin(self, **kwargs):
self._assert_is_active()
return SessionTransaction(self.session, self, **kwargs)
def add(self, bind):
if self.__parent is not None:
return self.__parent.add(bind)
def _iterate_parents(self, upto=None):
if self._parent is upto:
return (self,)
else:
if self._parent is None:
raise exceptions.InvalidRequestError("Transaction %s is not on the active transaction list" % upto)
return (self,) + self._parent._iterate_parents(upto)
if bind.engine in self.__connections:
def add(self, bind):
self._assert_is_active()
if self._parent is not None and not self.nested:
return self._parent.add(bind)
if bind.engine in self._connections:
raise exceptions.InvalidRequestError("Session already has a Connection associated for the given %sEngine" % (isinstance(bind, engine.Connection) and "Connection's " or ""))
return self.get_or_add(bind)
def _connection_dict(self):
if self.__parent is not None and not self.nested:
return self.__parent._connection_dict()
else:
return self.__connections
def get_or_add(self, bind):
if self.__parent is not None:
self._assert_is_active()
if bind in self._connections:
return self._connections[bind][0]
if self._parent is not None:
conn = self._parent.get_or_add(bind)
if not self.nested:
return self.__parent.get_or_add(bind)
if bind in self.__connections:
return self.__connections[bind][0]
conn_dict = self.__parent._connection_dict()
if bind in conn_dict:
(conn, trans, autoclose) = conn_dict[bind]
self.__connections[conn] = self.__connections[bind.engine] = (conn, conn.begin_nested(), autoclose)
return conn
elif bind in self.__connections:
return self.__connections[bind][0]
if not isinstance(bind, engine.Connection):
e = bind
c = bind.contextual_connect()
else:
e = bind.engine
c = bind
if e in self.__connections:
raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
if self.nested:
trans = c.begin_nested()
elif self.session.twophase:
trans = c.begin_twophase()
if isinstance(bind, engine.Connection):
conn = bind
if conn.engine in self._connections:
raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
else:
conn = bind.contextual_connect()
if self.session.twophase and self._parent is None:
transaction = conn.begin_twophase()
elif self.nested:
transaction = conn.begin_nested()
else:
trans = c.begin()
self.__connections[c] = self.__connections[e] = (c, trans, c is not bind)
return self.__connections[c][0]
transaction = conn.begin()
self._connections[conn] = self._connections[conn.engine] = (conn, transaction, conn is not bind)
return conn
def commit(self):
if self.__parent is not None and not self.nested:
return self.__parent
if self.session.extension is not None:
def prepare(self):
if self._parent is not None or not self.session.twophase:
raise exceptions.InvalidRequestError("Only root two phase transactions of can be prepared")
self._prepare_impl()
def _prepare_impl(self):
self._assert_is_active()
if self.session.extension is not None and (self._parent is None or self.nested):
self.session.extension.before_commit(self.session)
if self.session.transaction is not self:
for subtransaction in self.session.transaction._iterate_parents(upto=self):
subtransaction.commit()
if self.autoflush:
self.session.flush()
if self._parent is None and self.session.twophase:
try:
for t in util.Set(self._connections.values()):
t[1].prepare()
except:
for t in util.Set(self._connections.values()):
try:
t[1].rollback()
except:
pass
raise
self._deactivate()
self._prepared = True
def commit(self):
self._assert_is_open()
if not self._prepared:
self._prepare_impl()
if self._parent is None or self.nested:
for t in util.Set(self._connections.values()):
t[1].commit()
if self.session.twophase:
for t in util.Set(self.__connections.values()):
t[1].prepare()
for t in util.Set(self.__connections.values()):
t[1].commit()
if self.session.extension is not None:
self.session.extension.after_commit(self.session)
if self.session.extension is not None:
self.session.extension.after_commit(self.session)
self.close()
return self.__parent
return self._parent
def rollback(self):
if self.__parent is not None and not self.nested:
return self.__parent.rollback()
for t in util.Set(self.__connections.values()):
self._assert_is_open()
if self.session.transaction is not self:
for subtransaction in self.session.transaction._iterate_parents(upto=self):
subtransaction.close()
if self.is_active:
for transaction in self._iterate_parents():
if transaction._parent is None or transaction.nested:
transaction._rollback_impl()
transaction._deactivate()
break
else:
transaction._deactivate()
self.close()
return self._parent
def _rollback_impl(self):
for t in util.Set(self._connections.values()):
t[1].rollback()
if self.session.extension is not None:
self.session.extension.after_rollback(self.session)
self.close()
return self.__parent
def _deactivate(self):
self._active = False
def close(self):
if self.__parent is not None:
return
for t in util.Set(self.__connections.values()):
if t[2]:
t[0].close()
else:
t[1].close()
self.session.transaction = None
self.session.transaction = self._parent
if self._parent is None:
for connection, transaction, autoclose in util.Set(self._connections.values()):
if autoclose:
connection.close()
else:
transaction.close()
self._deactivate()
self.session = None
self._connections = None
def __enter__(self):
return self
@@ -458,7 +512,7 @@ class Session(object):
if self.transaction is None:
pass
else:
self.transaction = self.transaction.rollback()
self.transaction.rollback()
# TODO: we can rollback attribute values. however
# we would want to expand attributes.py to be able to save *two* rollback points, one to the
# last flush() and the other to when the object first entered the transaction.
@@ -484,13 +538,29 @@ class Session(object):
if self.transaction is None:
if self.transactional:
self.begin()
self.transaction = self.transaction.commit()
else:
raise exceptions.InvalidRequestError("No transaction is begun.")
else:
self.transaction = self.transaction.commit()
self.transaction.commit()
if self.transaction is None and self.transactional:
self.begin()
def prepare(self):
"""Prepare the current transaction in progress for two phase commit.
If no transaction is in progress, this method raises
an InvalidRequestError.
Only root transactions of two phase sessions can be prepared. If the current transaction is
not such, an InvalidRequestError is raised.
"""
if self.transaction is None:
if self.transactional:
self.begin()
else:
raise exceptions.InvalidRequestError("No transaction is begun.")
self.transaction.prepare()
def connection(self, mapper=None, **kwargs):
"""Return a ``Connection`` corresponding to this session's
@@ -554,7 +624,8 @@ class Session(object):
self.clear()
if self.transaction is not None:
self.transaction.close()
for transaction in self.transaction._iterate_parents():
transaction.close()
if self.transactional:
# note this doesnt use any connection resources
self.begin()
+117 -1
View File
@@ -1,6 +1,6 @@
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
from sqlalchemy import exceptions
from sqlalchemy import exceptions, util
from sqlalchemy.orm import *
from sqlalchemy.orm.session import SessionExtension
from sqlalchemy.orm.session import Session as SessionCls
@@ -355,6 +355,122 @@ class SessionTest(AssertMixin):
assert len(sess.query(User).all()) == 1
sess.close()
@testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def test_nested_transaction_connection_add(self):
class User(object): pass
mapper(User, users)
sess = create_session(transactional=False)
sess.begin()
sess.begin_nested()
u1 = User()
sess.save(u1)
sess.flush()
sess.rollback()
u2 = User()
sess.save(u2)
sess.commit()
self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2]))
sess.begin()
sess.begin_nested()
u3 = User()
sess.save(u3)
sess.commit() # commit the nested transaction
sess.rollback()
self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2]))
sess.close()
@testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def test_mixed_transaction_control(self):
class User(object): pass
mapper(User, users)
sess = create_session(transactional=False)
sess.begin()
sess.begin_nested()
transaction = sess.begin()
sess.save(User())
transaction.commit()
sess.commit()
sess.commit()
sess.close()
self.assertEquals(len(sess.query(User).all()), 1)
t1 = sess.begin()
t2 = sess.begin_nested()
sess.save(User())
t2.commit()
assert sess.transaction is t1
sess.close()
@testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def test_mixed_transaction_close(self):
class User(object): pass
mapper(User, users)
sess = create_session(transactional=True)
sess.begin_nested()
sess.save(User())
sess.flush()
sess.close()
sess.save(User())
sess.commit()
sess.close()
self.assertEquals(len(sess.query(User).all()), 1)
@testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def test_error_on_using_inactive_session(self):
class User(object): pass
mapper(User, users)
sess = create_session(transactional=False)
try:
sess.begin()
sess.begin()
sess.save(User())
sess.flush()
sess.rollback()
sess.begin()
assert False
except exceptions.InvalidRequestError, e:
self.assertEquals(str(e), "The transaction is inactive due to a rollback in a subtransaction and should be closed")
sess.close()
@engines.close_open_connections
def test_bound_connection(self):
class User(object):pass