Merge remote-tracking branch 'origin/pr/117' into pg8000

This commit is contained in:
Mike Bayer
2014-07-25 16:17:15 -04:00
7 changed files with 2306 additions and 1996 deletions
@@ -171,4 +171,23 @@ class PGDialect_pg8000(PGDialect):
(level, self.name, ", ".join(self._isolation_lookup))
)
def do_begin_twophase(self, connection, xid):
print("begin twophase", xid)
connection.connection.tpc_begin((0, xid, ''))
def do_prepare_twophase(self, connection, xid):
print("prepare twophase", xid)
connection.connection.tpc_prepare()
def do_rollback_twophase(
self, connection, xid, is_prepared=True, recover=False):
connection.connection.tpc_rollback((0, xid, ''))
def do_commit_twophase(
self, connection, xid, is_prepared=True, recover=False):
connection.connection.tpc_commit((0, xid, ''))
def do_recover_twophase(self, connection):
return [row[1] for row in connection.connection.tpc_recover()]
dialect = PGDialect_pg8000
+87 -112
View File
@@ -1,18 +1,14 @@
from sqlalchemy.testing import eq_, is_
from sqlalchemy.orm import backref, configure_mappers
from sqlalchemy import testing
from sqlalchemy import desc, select, func, exc
from sqlalchemy.orm import mapper, relationship, create_session, Query, \
attributes, exc as orm_exc, Session
from sqlalchemy import testing, desc, select, func, exc, cast, Integer
from sqlalchemy.orm import (
mapper, relationship, create_session, Query, attributes, exc as orm_exc,
Session, backref, configure_mappers)
from sqlalchemy.orm.dynamic import AppenderMixin
from sqlalchemy.testing import AssertsCompiledSQL, \
assert_raises_message, assert_raises
from sqlalchemy.testing import (
AssertsCompiledSQL, assert_raises_message, assert_raises, eq_, is_)
from test.orm import _fixtures
from sqlalchemy.testing.assertsql import CompiledSQL
class _DynamicFixture(object):
def _user_address_fixture(self, addresses_args={}):
users, Address, addresses, User = (self.tables.users,
@@ -20,10 +16,10 @@ class _DynamicFixture(object):
self.tables.addresses,
self.classes.User)
mapper(User, users, properties={
'addresses': relationship(Address, lazy="dynamic",
**addresses_args)
})
mapper(
User, users, properties={
'addresses': relationship(
Address, lazy="dynamic", **addresses_args)})
mapper(Address, addresses)
return User, Address
@@ -34,16 +30,15 @@ class _DynamicFixture(object):
self.tables.order_items,
self.classes.Item)
mapper(Order, orders, properties={
'items': relationship(Item,
secondary=order_items,
lazy="dynamic",
**items_args
)
})
mapper(
Order, orders, properties={
'items': relationship(
Item, secondary=order_items, lazy="dynamic",
**items_args)})
mapper(Item, items)
return Order, Item
class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
def test_basic(self):
@@ -86,7 +81,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
def test_no_uselist_false(self):
User, Address = self._user_address_fixture(
addresses_args={"uselist": False})
addresses_args={"uselist": False})
assert_raises_message(
exc.InvalidRequestError,
"On relationship User.addresses, 'dynamic' loaders cannot be "
@@ -100,9 +95,9 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
self.classes.Address,
self.tables.addresses,
self.classes.User)
mapper(Address, addresses, properties={
'user': relationship(User, lazy='dynamic')
})
mapper(
Address, addresses, properties={
'user': relationship(User, lazy='dynamic')})
mapper(User, users)
assert_raises_message(
exc.InvalidRequestError,
@@ -118,7 +113,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
u = sess.query(User).get(8)
eq_(
list(u.addresses.order_by(desc(Address.email_address))),
[
[
Address(email_address='ed@wood.com'),
Address(email_address='ed@lala.com'),
Address(email_address='ed@bettyboop.com')
@@ -128,9 +123,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
def test_configured_order_by(self):
addresses = self.tables.addresses
User, Address = self._user_address_fixture(
addresses_args={
"order_by":
addresses.c.email_address.desc()})
addresses_args={"order_by": addresses.c.email_address.desc()})
sess = create_session()
u = sess.query(User).get(8)
@@ -183,6 +176,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
sess = create_session()
ad = sess.query(Address).get(1)
def go():
ad.user = None
self.assert_sql_count(testing.db, go, 0)
@@ -202,12 +196,9 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
eq_(
q.filter(User.id == 7).all(),
[
User(id=7,
addresses=[
Address(id=1, email_address='jack@bean.com')
])
]
)
User(
id=7, addresses=[
Address(id=1, email_address='jack@bean.com')])])
self.assert_sql_count(testing.db, go, 2)
def test_no_populate(self):
@@ -220,9 +211,8 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
)
def test_m2m(self):
Order, Item = self._order_item_fixture(items_args={
"backref": backref("orders", lazy="dynamic")
})
Order, Item = self._order_item_fixture(
items_args={"backref": backref("orders", lazy="dynamic")})
sess = create_session()
o1 = Order(id=15, description="order 10")
@@ -234,9 +224,9 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
assert o1 in i1.orders.all()
assert i1 in o1.items.all()
@testing.exclude('mysql', 'between',
((5, 1, 49), (5, 1, 52)),
'https://bugs.launchpad.net/ubuntu/+source/mysql-5.1/+bug/706988')
@testing.exclude(
'mysql', 'between', ((5, 1, 49), (5, 1, 52)),
'https://bugs.launchpad.net/ubuntu/+source/mysql-5.1/+bug/706988')
def test_association_nonaliased(self):
items, Order, orders, order_items, Item = (self.tables.items,
self.classes.Order,
@@ -288,7 +278,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
class MyQuery(Query):
pass
User, Address = self._user_address_fixture(
addresses_args={"query_class": MyQuery})
addresses_args={"query_class": MyQuery})
sess = create_session()
u = User()
@@ -322,9 +312,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
query_class = MyQuery
User, Address = self._user_address_fixture(
addresses_args={
"query_class": MyAppenderQuery})
addresses_args={"query_class": MyAppenderQuery})
sess = create_session()
u = User()
@@ -345,8 +333,10 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL):
eq_(type(q).__name__, 'MyQuery')
class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
testing.AssertsExecutionResults):
class UOWTest(
_DynamicFixture, _fixtures.FixtureTest,
testing.AssertsExecutionResults):
run_inserts = None
def test_persistence(self):
@@ -361,17 +351,17 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
eq_(
testing.db.scalar(
select([func.count(1)]).where(addresses.c.user_id != None)
),
0
)
select(
[func.count(cast(1, Integer))]).
where(addresses.c.user_id != None)),
0)
u1 = sess.query(User).get(u1.id)
u1.addresses.append(a1)
sess.flush()
eq_(
testing.db.execute(
select([addresses]).where(addresses.c.user_id != None)
select([addresses]).where(addresses.c.user_id != None)
).fetchall(),
[(a1.id, u1.id, 'foo')]
)
@@ -380,8 +370,9 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
sess.flush()
eq_(
testing.db.scalar(
select([func.count(1)]).where(addresses.c.user_id != None)
),
select(
[func.count(cast(1, Integer))]).
where(addresses.c.user_id != None)),
0
)
@@ -405,12 +396,10 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
[(a2.id, u1.id, 'bar')]
)
def test_merge(self):
addresses = self.tables.addresses
User, Address = self._user_address_fixture(
addresses_args={
"order_by": addresses.c.email_address})
addresses_args={"order_by": addresses.c.email_address})
sess = create_session()
u1 = User(name='jack')
a1 = Address(email_address='a1')
@@ -452,8 +441,7 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
def test_collection_set(self):
addresses = self.tables.addresses
User, Address = self._user_address_fixture(
addresses_args={
"order_by": addresses.c.email_address})
addresses_args={"order_by": addresses.c.email_address})
sess = create_session(autoflush=True, autocommit=False)
u1 = User(name='jack')
a1 = Address(email_address='a1')
@@ -541,7 +529,7 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
def test_rollback(self):
User, Address = self._user_address_fixture()
sess = create_session(
expire_on_commit=False, autocommit=False, autoflush=True)
expire_on_commit=False, autocommit=False, autoflush=True)
u1 = User(name='jack')
u1.addresses.append(Address(email_address='lala@hoho.com'))
sess.add(u1)
@@ -563,12 +551,11 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
def _test_delete_cascade(self, expected):
addresses = self.tables.addresses
User, Address = self._user_address_fixture(addresses_args={
"order_by": addresses.c.id,
"backref": "user",
"cascade": "save-update" if expected \
else "all, delete"
})
User, Address = self._user_address_fixture(
addresses_args={
"order_by": addresses.c.id,
"backref": "user",
"cascade": "save-update" if expected else "all, delete"})
sess = create_session(autoflush=True, autocommit=False)
u = User(name='ed')
@@ -577,19 +564,19 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
)
sess.add(u)
sess.commit()
eq_(testing.db.scalar(
addresses.count(addresses.c.user_id == None)), 0)
eq_(testing.db.scalar(
addresses.count(addresses.c.user_id != None)), 6)
eq_(testing.db.scalar(addresses.count(addresses.c.user_id == None)), 0)
eq_(testing.db.scalar(addresses.count(addresses.c.user_id != None)), 6)
sess.delete(u)
sess.commit()
if expected:
eq_(testing.db.scalar(
eq_(
testing.db.scalar(
addresses.count(addresses.c.user_id == None)), 6)
eq_(testing.db.scalar(
eq_(
testing.db.scalar(
addresses.count(addresses.c.user_id != None)), 0)
else:
eq_(testing.db.scalar(addresses.count()), 0)
@@ -603,10 +590,10 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
def test_self_referential(self):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
'children': relationship(Node, lazy="dynamic", order_by=nodes.c.id)
})
mapper(
Node, nodes, properties={
'children': relationship(
Node, lazy="dynamic", order_by=nodes.c.id)})
sess = Session()
n2, n3 = Node(), Node()
@@ -616,14 +603,13 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
eq_(n1.children.all(), [n2, n3])
def test_remove_orphans(self):
addresses = self.tables.addresses
User, Address = self._user_address_fixture(addresses_args={
"order_by": addresses.c.id,
"backref": "user",
"cascade": "all, delete-orphan"
})
User, Address = self._user_address_fixture(
addresses_args={
"order_by": addresses.c.id,
"backref": "user",
"cascade": "all, delete-orphan"})
sess = create_session(autoflush=True, autocommit=False)
u = User(name='ed')
@@ -642,9 +628,8 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
)
def _backref_test(self, autoflush, saveuser):
User, Address = self._user_address_fixture(addresses_args={
"backref": "user",
})
User, Address = self._user_address_fixture(
addresses_args={"backref": "user"})
sess = create_session(autoflush=autoflush, autocommit=False)
u = User(name='buffy')
@@ -686,9 +671,8 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
self._backref_test(False, False)
def test_backref_events(self):
User, Address = self._user_address_fixture(addresses_args={
"backref": "user",
})
User, Address = self._user_address_fixture(
addresses_args={"backref": "user"})
u1 = User()
a1 = Address()
@@ -696,9 +680,8 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
is_(a1.user, u1)
def test_no_deref(self):
User, Address = self._user_address_fixture(addresses_args={
"backref": "user",
})
User, Address = self._user_address_fixture(
addresses_args={"backref": "user", })
session = create_session()
user = User()
@@ -723,19 +706,19 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest,
def query3():
session = create_session(testing.db)
user = session.query(User).first()
return session.query(User).first().addresses.all()
eq_(query1(), [Address(email_address='joe@joesdomain.example')])
eq_(query2(), [Address(email_address='joe@joesdomain.example')])
eq_(query3(), [Address(email_address='joe@joesdomain.example')])
class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
run_inserts = None
def _transient_fixture(self, addresses_args={}):
User, Address = self._user_address_fixture(
addresses_args=addresses_args)
addresses_args=addresses_args)
u1 = User()
a1 = Address()
@@ -743,7 +726,7 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
def _persistent_fixture(self, autoflush=True, addresses_args={}):
User, Address = self._user_address_fixture(
addresses_args=addresses_args)
addresses_args=addresses_args)
u1 = User(name='u1')
a1 = Address(email_address='a1')
@@ -836,8 +819,7 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
def test_backref_pop_persistent_autoflush_o2m_active_hist(self):
u1, a1, s = self._persistent_fixture(
addresses_args={"backref":
backref("user", active_history=True)})
addresses_args={"backref": backref("user", active_history=True)})
u1.addresses.append(a1)
s.flush()
s.expire_all()
@@ -850,7 +832,7 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
def test_backref_pop_persistent_autoflush_m2m(self):
o1, i1, s = self._persistent_m2m_fixture(
items_args={"backref": "orders"})
items_args={"backref": "orders"})
o1.items.append(i1)
s.flush()
s.expire_all()
@@ -863,7 +845,7 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
def test_backref_pop_persistent_noflush_m2m(self):
o1, i1, s = self._persistent_m2m_fixture(
items_args={"backref": "orders"}, autoflush=False)
items_args={"backref": "orders"}, autoflush=False)
o1.items.append(i1)
s.flush()
s.expire_all()
@@ -897,9 +879,8 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
u1, a1 = self._transient_fixture()
a2, a3, a4, a5 = Address(email_address='a2'), \
Address(email_address='a3'), \
Address(email_address='a4'), \
Address(email_address='a5')
Address(email_address='a3'), Address(email_address='a4'), \
Address(email_address='a5')
u1.addresses = [a1, a2]
u1.addresses = [a2, a3, a4, a5]
@@ -913,9 +894,8 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
u1, a1, s = self._persistent_fixture(autoflush=False)
a2, a3, a4, a5 = Address(email_address='a2'), \
Address(email_address='a3'), \
Address(email_address='a4'), \
Address(email_address='a5')
Address(email_address='a3'), Address(email_address='a4'), \
Address(email_address='a5')
u1.addresses = [a1, a2]
u1.addresses = [a2, a3, a4, a5]
@@ -929,9 +909,8 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
u1, a1, s = self._persistent_fixture(autoflush=True)
a2, a3, a4, a5 = Address(email_address='a2'), \
Address(email_address='a3'), \
Address(email_address='a4'), \
Address(email_address='a5')
Address(email_address='a3'), Address(email_address='a4'), \
Address(email_address='a5')
u1.addresses = [a1, a2]
u1.addresses = [a2, a3, a4, a5]
@@ -941,7 +920,6 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
compare_passive=([a3, a4, a5], [], [a1])
)
def test_persistent_but_readded_noflush(self):
u1, a1, s = self._persistent_fixture(autoflush=False)
u1.addresses.append(a1)
@@ -971,7 +949,4 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest):
u1.addresses.remove(a1)
self._assert_history(u1,
([], [], []),
compare_passive=([], [], [a1])
)
self._assert_history(u1, ([], [], []), compare_passive=([], [], [a1]))
+1045 -948
View File
File diff suppressed because it is too large Load Diff
+233 -211
View File
@@ -3,18 +3,15 @@ Primary key changing capabilities and passive/non-passive cascading updates.
"""
from sqlalchemy.testing import eq_, ne_, \
assert_raises, assert_raises_message
from sqlalchemy.testing import fixtures, eq_, ne_, assert_raises
import sqlalchemy as sa
from sqlalchemy import testing
from sqlalchemy import Integer, String, ForeignKey, Unicode
from sqlalchemy import testing, Integer, String, ForeignKey
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, create_session, backref, Session
from sqlalchemy.orm import mapper, relationship, create_session, Session
from sqlalchemy.orm.session import make_transient
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from test.orm import _fixtures
def _backend_specific_fk_args():
if testing.requires.deferrable_fks.enabled:
fk_args = dict(deferrable=True, initially='deferred')
@@ -24,6 +21,7 @@ def _backend_specific_fk_args():
fk_args = dict(onupdate='cascade')
return fk_args
class NaturalPKTest(fixtures.MappedTest):
# MySQL 5.5 on Windows crashes (the entire server, not the client)
# if you screw around with ON UPDATE CASCADE type of stuff.
@@ -34,37 +32,44 @@ class NaturalPKTest(fixtures.MappedTest):
def define_tables(cls, metadata):
fk_args = _backend_specific_fk_args()
users = Table('users', metadata,
Table('users', metadata,
Column('username', String(50), primary_key=True),
Column('fullname', String(100)),
test_needs_fk=True)
addresses = Table('addresses', metadata,
Table(
'addresses', metadata,
Column('email', String(50), primary_key=True),
Column('username', String(50),
ForeignKey('users.username', **fk_args)),
Column(
'username', String(50),
ForeignKey('users.username', **fk_args)),
test_needs_fk=True)
items = Table('items', metadata,
Table(
'items', metadata,
Column('itemname', String(50), primary_key=True),
Column('description', String(100)),
test_needs_fk=True)
users_to_items = Table('users_to_items', metadata,
Column('username', String(50),
ForeignKey('users.username', **fk_args),
primary_key=True),
Column('itemname', String(50),
ForeignKey('items.itemname', **fk_args),
primary_key=True),
Table(
'users_to_items', metadata,
Column(
'username', String(50),
ForeignKey('users.username', **fk_args), primary_key=True),
Column(
'itemname', String(50),
ForeignKey('items.itemname', **fk_args), primary_key=True),
test_needs_fk=True)
@classmethod
def setup_classes(cls):
class User(cls.Comparable):
pass
class Address(cls.Comparable):
pass
class Item(cls.Comparable):
pass
@@ -105,7 +110,7 @@ class NaturalPKTest(fixtures.MappedTest):
sess.flush()
assert sess.query(User).get('jack') is u1
users.update(values={User.username:'jack'}).execute(username='ed')
users.update(values={User.username: 'jack'}).execute(username='ed')
# expire/refresh works off of primary key. the PK is gone
# in this case so there's no way to look it up. criterion-
@@ -134,7 +139,6 @@ class NaturalPKTest(fixtures.MappedTest):
sess.expunge_all()
assert sess.query(User).get('ed').fullname == 'jack'
@testing.requires.on_update_cascade
def test_onetomany_passive(self):
self._test_onetomany(True)
@@ -148,9 +152,10 @@ class NaturalPKTest(fixtures.MappedTest):
self.tables.addresses,
self.classes.User)
mapper(User, users, properties={
'addresses':relationship(Address, passive_updates=passive_updates)
})
mapper(
User, users, properties={
'addresses': relationship(
Address, passive_updates=passive_updates)})
mapper(Address, addresses)
sess = create_session()
@@ -167,11 +172,13 @@ class NaturalPKTest(fixtures.MappedTest):
assert u1.addresses[0].username == 'ed'
sess.expunge_all()
eq_([Address(username='ed'), Address(username='ed')],
sess.query(Address).all())
eq_(
[Address(username='ed'), Address(username='ed')],
sess.query(Address).all())
u1 = sess.query(User).get('ed')
u1.username = 'jack'
def go():
sess.flush()
if not passive_updates:
@@ -182,10 +189,10 @@ class NaturalPKTest(fixtures.MappedTest):
# test passive_updates=True; update user
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
assert User(username='jack', addresses=[
Address(username='jack'),
Address(username='jack')]) == \
sess.query(User).get('jack')
assert User(
username='jack', addresses=[
Address(username='jack'),
Address(username='jack')]) == sess.query(User).get('jack')
u1 = sess.query(User).get('jack')
u1.addresses = []
@@ -215,15 +222,9 @@ class NaturalPKTest(fixtures.MappedTest):
self.classes.User)
with testing.db.begin() as conn:
conn.execute(users.insert(),
username='jack', fullname='jack'
)
conn.execute(addresses.insert(),
email='jack1', username='jack'
)
conn.execute(addresses.insert(),
email='jack2', username='jack'
)
conn.execute(users.insert(), username='jack', fullname='jack')
conn.execute(addresses.insert(), email='jack1', username='jack')
conn.execute(addresses.insert(), email='jack2', username='jack')
mapper(User, users)
mapper(Address, addresses, properties={
@@ -277,9 +278,9 @@ class NaturalPKTest(fixtures.MappedTest):
assert a1.username == a2.username == 'ed'
sess.expunge_all()
eq_([Address(username='ed'), Address(username='ed')],
sess.query(Address).all())
eq_(
[Address(username='ed'), Address(username='ed')],
sess.query(Address).all())
@testing.requires.on_update_cascade
def test_onetoone_passive(self):
@@ -294,10 +295,10 @@ class NaturalPKTest(fixtures.MappedTest):
self.tables.addresses,
self.classes.User)
mapper(User, users, properties={
"address":relationship(Address, passive_updates=passive_updates,
uselist=False)
})
mapper(
User, users, properties={
"address": relationship(
Address, passive_updates=passive_updates, uselist=False)})
mapper(Address, addresses)
sess = create_session()
@@ -359,6 +360,7 @@ class NaturalPKTest(fixtures.MappedTest):
u1.username = 'ed'
(ad1, ad2) = sess.query(Address).all()
eq_([Address(username='jack'), Address(username='jack')], [ad1, ad2])
def go():
sess.flush()
if passive_updates:
@@ -367,12 +369,14 @@ class NaturalPKTest(fixtures.MappedTest):
self.assert_sql_count(testing.db, go, 3)
eq_([Address(username='ed'), Address(username='ed')], [ad1, ad2])
sess.expunge_all()
eq_([Address(username='ed'), Address(username='ed')],
sess.query(Address).all())
eq_(
[Address(username='ed'), Address(username='ed')],
sess.query(Address).all())
u1 = sess.query(User).get('ed')
assert len(u1.addresses) == 2 # load addresses
u1.username = 'fred'
def go():
sess.flush()
# check that the passive_updates is on on the other side
@@ -381,15 +385,14 @@ class NaturalPKTest(fixtures.MappedTest):
else:
self.assert_sql_count(testing.db, go, 3)
sess.expunge_all()
eq_([Address(username='fred'), Address(username='fred')],
sess.query(Address).all())
eq_(
[Address(username='fred'), Address(username='fred')],
sess.query(Address).all())
@testing.requires.on_update_cascade
def test_manytomany_passive(self):
self._test_manytomany(True)
@testing.requires.non_updating_cascade
def test_manytomany_nonpassive(self):
self._test_manytomany(False)
@@ -400,10 +403,11 @@ class NaturalPKTest(fixtures.MappedTest):
self.classes.User,
self.tables.users_to_items)
mapper(User, users, properties={
'items':relationship(Item, secondary=users_to_items,
backref='users',
passive_updates=passive_updates)})
mapper(
User, users, properties={
'items': relationship(
Item, secondary=users_to_items, backref='users',
passive_updates=passive_updates)})
mapper(Item, items)
sess = create_session()
@@ -427,10 +431,12 @@ class NaturalPKTest(fixtures.MappedTest):
eq_(Item(itemname='item2'), r[1])
eq_(['jack', 'fred'], [u.username for u in r[1].users])
u2.username='ed'
u2.username = 'ed'
def go():
sess.flush()
go()
def go():
sess.flush()
self.assert_sql_count(testing.db, go, 0)
@@ -444,11 +450,12 @@ class NaturalPKTest(fixtures.MappedTest):
sess.expunge_all()
u2 = sess.query(User).get(u2.username)
u2.username='wendy'
u2.username = 'wendy'
sess.flush()
r = sess.query(Item).with_parent(u2).all()
eq_(Item(itemname='item2'), r[0])
class TransientExceptionTesst(_fixtures.FixtureTest):
run_inserts = None
__backend__ = True
@@ -465,7 +472,7 @@ class TransientExceptionTesst(_fixtures.FixtureTest):
self.classes.User)
mapper(User, users)
mapper(Address, addresses, properties={'user':relationship(User)})
mapper(Address, addresses, properties={'user': relationship(User)})
sess = create_session()
u1 = User(id=5, name='u1')
@@ -475,7 +482,7 @@ class TransientExceptionTesst(_fixtures.FixtureTest):
make_transient(u1)
u1.id = None
u1.username='u2'
u1.username = 'u2'
sess.add(u1)
sess.flush()
@@ -487,6 +494,7 @@ class TransientExceptionTesst(_fixtures.FixtureTest):
ne_(u1.id, None)
eq_(sess.query(User).count(), 2)
class ReversePKsTest(fixtures.MappedTest):
"""reverse the primary keys of two entities and ensure bookkeeping
succeeds."""
@@ -578,6 +586,7 @@ class ReversePKsTest(fixtures.MappedTest):
eq_(a_published.status, PUBLISHED)
eq_(a_editable.status, EDITABLE)
class SelfReferentialTest(fixtures.MappedTest):
# mssql, mysql don't allow
# ON UPDATE on self-referential keys
@@ -590,12 +599,11 @@ class SelfReferentialTest(fixtures.MappedTest):
def define_tables(cls, metadata):
fk_args = _backend_specific_fk_args()
Table('nodes', metadata,
Column('name', String(50), primary_key=True),
Column('parent', String(50),
ForeignKey('nodes.name', **fk_args)),
test_needs_fk=True
)
Table(
'nodes', metadata,
Column('name', String(50), primary_key=True),
Column('parent', String(50), ForeignKey('nodes.name', **fk_args)),
test_needs_fk=True)
@classmethod
def setup_classes(cls):
@@ -605,12 +613,14 @@ class SelfReferentialTest(fixtures.MappedTest):
def test_one_to_many_on_m2o(self):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
'children': relationship(Node,
backref=sa.orm.backref('parentnode',
remote_side=nodes.c.name,
passive_updates=False),
)})
mapper(
Node, nodes, properties={
'children': relationship(
Node,
backref=sa.orm.backref(
'parentnode', remote_side=nodes.c.name,
passive_updates=False),
)})
sess = Session()
n1 = Node(name='n1')
@@ -631,12 +641,13 @@ class SelfReferentialTest(fixtures.MappedTest):
def test_one_to_many_on_o2m(self):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
'children': relationship(Node,
backref=sa.orm.backref('parentnode',
remote_side=nodes.c.name),
passive_updates=False
)})
mapper(
Node, nodes, properties={
'children': relationship(
Node,
backref=sa.orm.backref(
'parentnode', remote_side=nodes.c.name),
passive_updates=False)})
sess = Session()
n1 = Node(name='n1')
@@ -664,11 +675,10 @@ class SelfReferentialTest(fixtures.MappedTest):
def _test_many_to_one(self, passive):
Node, nodes = self.classes.Node, self.tables.nodes
mapper(Node, nodes, properties={
'parentnode':relationship(Node,
remote_side=nodes.c.name,
passive_updates=passive)
}
mapper(
Node, nodes, properties={
'parentnode': relationship(
Node, remote_side=nodes.c.name, passive_updates=passive)}
)
sess = Session()
@@ -681,10 +691,11 @@ class SelfReferentialTest(fixtures.MappedTest):
n1.name = 'new n1'
sess.commit()
eq_(['new n1', 'new n1', 'new n1'],
[n.parent
for n in sess.query(Node).filter(
Node.name.in_(['n11', 'n12', 'n13']))])
eq_(
['new n1', 'new n1', 'new n1'],
[
n.parent for n in sess.query(Node).filter(
Node.name.in_(['n11', 'n12', 'n13']))])
class NonPKCascadeTest(fixtures.MappedTest):
@@ -695,26 +706,32 @@ class NonPKCascadeTest(fixtures.MappedTest):
def define_tables(cls, metadata):
fk_args = _backend_specific_fk_args()
Table('users', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Table(
'users', metadata,
Column(
'id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('username', String(50), unique=True),
Column('fullname', String(100)),
test_needs_fk=True)
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('email', String(50)),
Column('username', String(50),
ForeignKey('users.username', **fk_args)),
test_needs_fk=True
)
Table(
'addresses', metadata,
Column(
'id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('email', String(50)),
Column(
'username', String(50),
ForeignKey('users.username', **fk_args)),
test_needs_fk=True)
@classmethod
def setup_classes(cls):
class User(cls.Comparable):
pass
class Address(cls.Comparable):
pass
@@ -731,9 +748,10 @@ class NonPKCascadeTest(fixtures.MappedTest):
self.tables.users,
self.tables.addresses)
mapper(User, users, properties={
'addresses':relationship(Address,
passive_updates=passive_updates)})
mapper(
User, users, properties={
'addresses': relationship(
Address, passive_updates=passive_updates)})
mapper(Address, addresses)
sess = create_session()
@@ -744,37 +762,41 @@ class NonPKCascadeTest(fixtures.MappedTest):
sess.flush()
a1 = u1.addresses[0]
eq_(sa.select([addresses.c.username]).execute().fetchall(),
[('jack',), ('jack',)])
eq_(
sa.select([addresses.c.username]).execute().fetchall(),
[('jack',), ('jack',)])
assert sess.query(Address).get(a1.id) is u1.addresses[0]
u1.username = 'ed'
sess.flush()
assert u1.addresses[0].username == 'ed'
eq_(sa.select([addresses.c.username]).execute().fetchall(),
[('ed',), ('ed',)])
eq_(
sa.select([addresses.c.username]).execute().fetchall(),
[('ed',), ('ed',)])
sess.expunge_all()
eq_([Address(username='ed'), Address(username='ed')],
sess.query(Address).all())
eq_(
[Address(username='ed'), Address(username='ed')],
sess.query(Address).all())
u1 = sess.query(User).get(u1.id)
u1.username = 'jack'
def go():
sess.flush()
if not passive_updates:
# test passive_updates=False; load addresses,
# update user, update 2 addresses
# update user, update 2 addresses
self.assert_sql_count(testing.db, go, 4)
else:
# test passive_updates=True; update user
# test passive_updates=True; update user
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
assert User(username='jack',
addresses=[Address(username='jack'),
Address(username='jack')]) == \
sess.query(User).get(u1.id)
assert User(
username='jack', addresses=[
Address(username='jack'),
Address(username='jack')]) == sess.query(User).get(u1.id)
sess.expunge_all()
u1 = sess.query(User).get(u1.id)
@@ -785,8 +807,9 @@ class NonPKCascadeTest(fixtures.MappedTest):
a1 = sess.query(Address).get(a1.id)
eq_(a1.username, None)
eq_(sa.select([addresses.c.username]).execute().fetchall(),
[(None,), (None,)])
eq_(
sa.select([addresses.c.username]).execute().fetchall(),
[(None,), (None,)])
u1 = sess.query(User).get(u1.id)
eq_(User(username='fred', fullname='jack'), u1)
@@ -805,20 +828,22 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
Column('username', String(50), primary_key=True),
test_needs_fk=True)
Table('addresses', metadata,
Column('username', String(50),
ForeignKey('users.username', **fk_args),
primary_key=True
),
Column('email', String(50), primary_key=True),
Column('etc', String(50)),
test_needs_fk=True
)
Table(
'addresses', metadata,
Column(
'username', String(50),
ForeignKey('users.username', **fk_args),
primary_key=True),
Column('email', String(50), primary_key=True),
Column('etc', String(50)),
test_needs_fk=True)
@classmethod
def setup_classes(cls):
class User(cls.Comparable):
pass
class Address(cls.Comparable):
pass
@@ -849,9 +874,10 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
self.tables.users,
self.tables.addresses)
mapper(User, users, properties={
'addresses':relationship(Address,
passive_updates=passive_updates)})
mapper(
User, users, properties={
'addresses': relationship(
Address, passive_updates=passive_updates)})
mapper(Address, addresses)
sess = create_session()
@@ -882,9 +908,10 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
self.tables.users,
self.tables.addresses)
mapper(User, users, properties={
'addresses':relationship(Address,
passive_updates=passive_updates)})
mapper(
User, users, properties={
'addresses': relationship(
Address, passive_updates=passive_updates)})
mapper(Address, addresses)
sess = create_session()
@@ -915,7 +942,7 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
mapper(User, users)
mapper(Address, addresses, properties={
'user':relationship(User, passive_updates=passive_updates)
'user': relationship(User, passive_updates=passive_updates)
})
sess = create_session()
@@ -924,7 +951,7 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
sess.add_all([u1, a1])
sess.flush()
u1.username='edmodified'
u1.username = 'edmodified'
sess.flush()
eq_(a1.username, 'edmodified')
@@ -945,9 +972,9 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
# tests [ticket:1856]
mapper(User, users)
mapper(Address, addresses, properties={
'user':relationship(User, passive_updates=passive_updates)
})
mapper(
Address, addresses, properties={
'user': relationship(User, passive_updates=passive_updates)})
sess = create_session()
u1 = User(username='jack')
@@ -959,7 +986,6 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
a1.user = u2
sess.flush()
def test_rowswitch_doesntfire(self):
User, Address, users, addresses = (self.classes.User,
self.classes.Address,
@@ -968,7 +994,7 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
mapper(User, users)
mapper(Address, addresses, properties={
'user':relationship(User, passive_updates=True)
'user': relationship(User, passive_updates=True)
})
sess = create_session()
@@ -991,16 +1017,14 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
# test that the primary key columns of addresses are not
# being updated as well, since this is a row switch.
self.assert_sql_execution(testing.db,
sess.flush,
CompiledSQL(
"UPDATE addresses SET etc=:etc WHERE "
"addresses.username = :addresses_username AND"
" addresses.email = :addresses_email",
{'etc': 'foo', 'addresses_username':'ed',
'addresses_email':'ed@host1'} ),
)
self.assert_sql_execution(
testing.db, sess.flush, CompiledSQL(
"UPDATE addresses SET etc=:etc WHERE "
"addresses.username = :addresses_username AND"
" addresses.email = :addresses_email", {
'etc': 'foo', 'addresses_username': 'ed',
'addresses_email': 'ed@host1'}),
)
def _test_onetomany(self, passive_updates):
"""Change the PK of a related entity via foreign key cascade.
@@ -1016,30 +1040,33 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
self.tables.users,
self.tables.addresses)
mapper(User, users, properties={
'addresses':relationship(Address,
passive_updates=passive_updates)})
mapper(
User, users, properties={
'addresses': relationship(
Address, passive_updates=passive_updates)})
mapper(Address, addresses)
sess = create_session()
a1, a2 = Address(username='ed', email='ed@host1'),\
Address(username='ed', email='ed@host2')
a1, a2 = Address(username='ed', email='ed@host1'), \
Address(username='ed', email='ed@host2')
u1 = User(username='ed', addresses=[a1, a2])
sess.add(u1)
sess.flush()
eq_(a1.username, 'ed')
eq_(a2.username, 'ed')
eq_(sa.select([addresses.c.username]).execute().fetchall(),
[('ed',), ('ed',)])
eq_(
sa.select([addresses.c.username]).execute().fetchall(),
[('ed',), ('ed',)])
u1.username = 'jack'
a2.email='ed@host3'
a2.email = 'ed@host3'
sess.flush()
eq_(a1.username, 'jack')
eq_(a2.username, 'jack')
eq_(sa.select([addresses.c.username]).execute().fetchall(),
[('jack',), ('jack', )])
eq_(
sa.select([addresses.c.username]).execute().fetchall(),
[('jack',), ('jack', )])
class JoinedInheritanceTest(fixtures.MappedTest):
@@ -1055,34 +1082,39 @@ class JoinedInheritanceTest(fixtures.MappedTest):
def define_tables(cls, metadata):
fk_args = _backend_specific_fk_args()
Table('person', metadata,
Table(
'person', metadata,
Column('name', String(50), primary_key=True),
Column('type', String(50), nullable=False),
test_needs_fk=True)
Table('engineer', metadata,
Column('name', String(50), ForeignKey('person.name', **fk_args),
primary_key=True),
Table(
'engineer', metadata,
Column(
'name', String(50), ForeignKey('person.name', **fk_args),
primary_key=True),
Column('primary_language', String(50)),
Column('boss_name', String(50),
ForeignKey('manager.name', **fk_args)),
test_needs_fk=True
Column(
'boss_name', String(50),
ForeignKey('manager.name', **fk_args)),
test_needs_fk=True
)
Table('manager', metadata,
Column('name', String(50),
ForeignKey('person.name', **fk_args),
primary_key=True),
Column('paperwork', String(50)),
test_needs_fk=True
Table(
'manager', metadata, Column('name', String(50),
ForeignKey('person.name', **fk_args), primary_key=True),
Column('paperwork', String(50)), test_needs_fk=True
)
@classmethod
def setup_classes(cls):
class Person(cls.Comparable):
pass
class Engineer(Person):
pass
class Manager(Person):
pass
@@ -1104,25 +1136,22 @@ class JoinedInheritanceTest(fixtures.MappedTest):
self._test_fk(False)
def _test_pk(self, passive_updates):
Person, Manager, person, manager, Engineer, engineer = (self.classes.Person,
self.classes.Manager,
self.tables.person,
self.tables.manager,
self.classes.Engineer,
self.tables.engineer)
Person, Manager, person, manager, Engineer, engineer = (
self.classes.Person, self.classes.Manager, self.tables.person,
self.tables.manager, self.classes.Engineer, self.tables.engineer)
mapper(Person, person, polymorphic_on=person.c.type,
polymorphic_identity='person',
passive_updates=passive_updates)
mapper(Engineer, engineer, inherits=Person,
mapper(
Person, person, polymorphic_on=person.c.type,
polymorphic_identity='person', passive_updates=passive_updates)
mapper(
Engineer, engineer, inherits=Person,
polymorphic_identity='engineer', properties={
'boss':relationship(Manager,
primaryjoin=manager.c.name==engineer.c.boss_name,
passive_updates=passive_updates
)
})
mapper(Manager, manager, inherits=Person,
polymorphic_identity='manager')
'boss': relationship(
Manager,
primaryjoin=manager.c.name == engineer.c.boss_name,
passive_updates=passive_updates)})
mapper(
Manager, manager, inherits=Person, polymorphic_identity='manager')
sess = sa.orm.sessionmaker()()
@@ -1134,32 +1163,28 @@ class JoinedInheritanceTest(fixtures.MappedTest):
sess.commit()
def _test_fk(self, passive_updates):
Person, Manager, person, manager, Engineer, engineer = (self.classes.Person,
self.classes.Manager,
self.tables.person,
self.tables.manager,
self.classes.Engineer,
self.tables.engineer)
Person, Manager, person, manager, Engineer, engineer = (
self.classes.Person, self.classes.Manager, self.tables.person,
self.tables.manager, self.classes.Engineer, self.tables.engineer)
mapper(Person, person, polymorphic_on=person.c.type,
polymorphic_identity='person',
passive_updates=passive_updates)
mapper(Engineer, engineer, inherits=Person,
polymorphic_identity='engineer', properties={
'boss':relationship(Manager,
primaryjoin=manager.c.name==engineer.c.boss_name,
passive_updates=passive_updates
)
})
mapper(Manager, manager, inherits=Person,
polymorphic_identity='manager')
mapper(
Person, person, polymorphic_on=person.c.type,
polymorphic_identity='person', passive_updates=passive_updates)
mapper(
Engineer, engineer, inherits=Person,
polymorphic_identity='engineer', properties={
'boss': relationship(
Manager,
primaryjoin=manager.c.name == engineer.c.boss_name,
passive_updates=passive_updates)})
mapper(
Manager, manager, inherits=Person, polymorphic_identity='manager')
sess = sa.orm.sessionmaker()()
m1 = Manager(name='dogbert', paperwork='lots')
e1, e2 = \
Engineer(name='dilbert', primary_language='java', boss=m1),\
Engineer(name='wally', primary_language='c++', boss=m1)
e1, e2 = Engineer(name='dilbert', primary_language='java', boss=m1),\
Engineer(name='wally', primary_language='c++', boss=m1)
sess.add_all([
e1, e2, m1
])
@@ -1176,6 +1201,3 @@ class JoinedInheritanceTest(fixtures.MappedTest):
eq_(e1.boss_name, 'pointy haired')
eq_(e2.boss_name, 'pointy haired')
+599 -471
View File
File diff suppressed because it is too large Load Diff
+144 -106
View File
@@ -1,15 +1,13 @@
from __future__ import with_statement
from sqlalchemy.testing import eq_, assert_raises, \
assert_raises_message, assert_warnings
from sqlalchemy import *
from sqlalchemy.orm import attributes
from sqlalchemy import exc as sa_exc, event
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.orm import *
from sqlalchemy import (
testing, exc as sa_exc, event, String, Column, Table, select, func)
from sqlalchemy.testing import (
fixtures, engines, eq_, assert_raises, assert_raises_message,
assert_warnings)
from sqlalchemy.orm import (
exc as orm_exc, Session, mapper, sessionmaker, create_session,
relationship, attributes)
from sqlalchemy.testing.util import gc_collect
from sqlalchemy import testing
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import engines
from test.orm._fixtures import FixtureTest
@@ -52,8 +50,8 @@ class SessionTransactionTest(FixtureTest):
u = User(name='ed')
sess.add(u)
sess.flush()
sess.commit() # commit does nothing
trans.rollback() # rolls back
sess.commit() # commit does nothing
trans.rollback() # rolls back
assert len(sess.query(User).all()) == 0
sess.close()
@@ -84,7 +82,6 @@ class SessionTransactionTest(FixtureTest):
conn.close()
raise
@testing.requires.savepoints
def test_nested_accounting_new_items_removed(self):
User, users = self.classes.User, self.tables.users
@@ -129,22 +126,22 @@ class SessionTransactionTest(FixtureTest):
session = create_session(bind=testing.db)
session.begin()
session.connection().execute(users.insert().values(name='user1'
))
session.connection().execute(users.insert().values(
name='user1'))
session.begin(subtransactions=True)
session.begin_nested()
session.connection().execute(users.insert().values(name='user2'
))
assert session.connection().execute('select count(1) from users'
).scalar() == 2
session.connection().execute(users.insert().values(
name='user2'))
assert session.connection().execute(
'select count(1) from users').scalar() == 2
session.rollback()
assert session.connection().execute('select count(1) from users'
).scalar() == 1
session.connection().execute(users.insert().values(name='user3'
))
assert session.connection().execute(
'select count(1) from users').scalar() == 1
session.connection().execute(users.insert().values(
name='user3'))
session.commit()
assert session.connection().execute('select count(1) from users'
).scalar() == 2
assert session.connection().execute(
'select count(1) from users').scalar() == 2
@testing.requires.independent_connections
def test_transactions_isolated(self):
@@ -196,8 +193,8 @@ class SessionTransactionTest(FixtureTest):
u = User(name='u1')
sess.add(u)
sess.flush()
sess.commit() # commit does nothing
sess.rollback() # rolls back
sess.commit() # commit does nothing
sess.rollback() # rolls back
assert len(sess.query(User).all()) == 0
sess.close()
@@ -276,7 +273,7 @@ class SessionTransactionTest(FixtureTest):
u3 = User(name='u3')
sess.add(u3)
sess.commit() # commit the nested transaction
sess.commit() # commit the nested transaction
sess.rollback()
eq_(set(sess.query(User).all()), set([u2]))
@@ -346,13 +343,15 @@ class SessionTransactionTest(FixtureTest):
sess = Session()
to_flush = [User(name='ed'), User(name='jack'), User(name='wendy')]
@event.listens_for(sess, "after_flush_postexec")
def add_another_user(session, ctx):
if to_flush:
session.add(to_flush.pop(0))
x = [1]
@event.listens_for(sess, "after_commit")
@event.listens_for(sess, "after_commit") # noqa
def add_another_user(session):
x[0] += 1
@@ -379,7 +378,6 @@ class SessionTransactionTest(FixtureTest):
sess.commit
)
def test_error_on_using_inactive_session_commands(self):
users, User = self.tables.users, self.classes.User
@@ -432,15 +430,11 @@ class SessionTransactionTest(FixtureTest):
trans = sess.begin()
trans.rollback()
assert_raises_message(
sa_exc.ResourceClosedError,
"This transaction is closed",
trans.rollback
)
sa_exc.ResourceClosedError, "This transaction is closed",
trans.rollback)
assert_raises_message(
sa_exc.ResourceClosedError,
"This transaction is closed",
trans.commit
)
sa_exc.ResourceClosedError, "This transaction is closed",
trans.commit)
def test_deactive_status_check(self):
sess = create_session()
@@ -493,6 +487,7 @@ class SessionTransactionTest(FixtureTest):
sess, u1 = self._inactive_flushed_session_fixture()
u2 = User(name='u2')
sess.add(u2)
def go():
sess.rollback()
assert_warnings(go,
@@ -506,6 +501,7 @@ class SessionTransactionTest(FixtureTest):
def test_warning_on_using_inactive_session_dirty(self):
sess, u1 = self._inactive_flushed_session_fixture()
u1.name = 'newname'
def go():
sess.rollback()
assert_warnings(go,
@@ -519,6 +515,7 @@ class SessionTransactionTest(FixtureTest):
def test_warning_on_using_inactive_session_delete(self):
sess, u1 = self._inactive_flushed_session_fixture()
sess.delete(u1)
def go():
sess.rollback()
assert_warnings(go,
@@ -558,6 +555,7 @@ class SessionTransactionTest(FixtureTest):
assert session.transaction is not None, \
'autocommit=False should start a new transaction'
class _LocalFixture(FixtureTest):
run_setup_mappers = 'once'
run_inserts = None
@@ -567,10 +565,11 @@ class _LocalFixture(FixtureTest):
def setup_mappers(cls):
User, Address = cls.classes.User, cls.classes.Address
users, addresses = cls.tables.users, cls.tables.addresses
mapper(User, users, properties={
'addresses':relationship(Address, backref='user',
cascade="all, delete-orphan",
order_by=addresses.c.id),
mapper(
User, users, properties={
'addresses': relationship(
Address, backref='user', cascade="all, delete-orphan",
order_by=addresses.c.id),
})
mapper(Address, addresses)
@@ -611,6 +610,7 @@ class FixtureDataTest(_LocalFixture):
assert u1.name == 'will'
class CleanSavepointTest(FixtureTest):
"""test the behavior for [ticket:2452] - rollback on begin_nested()
only expires objects tracked as being modified in that transaction.
@@ -641,27 +641,30 @@ class CleanSavepointTest(FixtureTest):
@testing.requires.savepoints
def test_rollback_ignores_clean_on_savepoint(self):
User, users = self.classes.User, self.tables.users
def update_fn(s, u2):
u2.name = 'u2modified'
self._run_test(update_fn)
@testing.requires.savepoints
def test_rollback_ignores_clean_on_savepoint_agg_upd_eval(self):
User, users = self.classes.User, self.tables.users
User = self.classes.User
def update_fn(s, u2):
s.query(User).filter_by(name='u2').update(dict(name='u2modified'),
synchronize_session='evaluate')
s.query(User).filter_by(name='u2').update(
dict(name='u2modified'), synchronize_session='evaluate')
self._run_test(update_fn)
@testing.requires.savepoints
def test_rollback_ignores_clean_on_savepoint_agg_upd_fetch(self):
User, users = self.classes.User, self.tables.users
User = self.classes.User
def update_fn(s, u2):
s.query(User).filter_by(name='u2').update(dict(name='u2modified'),
synchronize_session='fetch')
self._run_test(update_fn)
class ContextManagerTest(FixtureTest):
run_inserts = None
__backend__ = True
@@ -674,6 +677,7 @@ class ContextManagerTest(FixtureTest):
mapper(User, users)
sess = Session()
def go():
with sess.begin_nested():
sess.add(User()) # name can't be null
@@ -708,6 +712,7 @@ class ContextManagerTest(FixtureTest):
mapper(User, users)
sess = Session(autocommit=True)
def go():
with sess.begin():
sess.add(User()) # name can't be null
@@ -729,7 +734,7 @@ class AutoExpireTest(_LocalFixture):
def test_expunge_pending_on_rollback(self):
User = self.classes.User
sess = self.session()
u2= User(name='newuser')
u2 = User(name='newuser')
sess.add(u2)
assert u2 in sess
sess.rollback()
@@ -738,7 +743,7 @@ class AutoExpireTest(_LocalFixture):
def test_trans_pending_cleared_on_commit(self):
User = self.classes.User
sess = self.session()
u2= User(name='newuser')
u2 = User(name='newuser')
sess.add(u2)
assert u2 in sess
sess.commit()
@@ -836,8 +841,7 @@ class AutoExpireTest(_LocalFixture):
u1.addresses.remove(a1)
s.flush()
eq_(s.query(Address).filter(Address.email_address=='foo').all(),
[])
eq_(s.query(Address).filter(Address.email_address == 'foo').all(), [])
s.rollback()
assert a1 not in s.deleted
assert u1.addresses == [a1]
@@ -851,7 +855,6 @@ class AutoExpireTest(_LocalFixture):
sess.commit()
eq_(u1.name, 'newuser')
def test_concurrent_commit_pending(self):
User = self.classes.User
s1 = self.session()
@@ -860,12 +863,13 @@ class AutoExpireTest(_LocalFixture):
s1.commit()
s2 = self.session()
u2 = s2.query(User).filter(User.name=='edward').one()
u2 = s2.query(User).filter(User.name == 'edward').one()
u2.name = 'will'
s2.commit()
assert u1.name == 'will'
class TwoPhaseTest(_LocalFixture):
__backend__ = True
@@ -881,6 +885,7 @@ class TwoPhaseTest(_LocalFixture):
assert u not in s
class RollbackRecoverTest(_LocalFixture):
__backend__ = True
@@ -943,9 +948,10 @@ class RollbackRecoverTest(_LocalFixture):
s.commit()
eq_(
s.query(User).all(),
[User(id=1, name='edward',
addresses=[Address(email_address='foober')])]
)
[
User(
id=1, name='edward',
addresses=[Address(email_address='foober')])])
class SavepointTest(_LocalFixture):
@@ -965,18 +971,19 @@ class SavepointTest(_LocalFixture):
u1.name = 'edward'
u2.name = 'jackward'
s.add_all([u3, u4])
eq_(s.query(User.name).order_by(User.id).all(),
[('edward',), ('jackward',), ('wendy',), ('foo',)])
eq_(
s.query(User.name).order_by(User.id).all(),
[('edward',), ('jackward',), ('wendy',), ('foo',)])
s.rollback()
assert u1.name == 'ed'
assert u2.name == 'jack'
eq_(s.query(User.name).order_by(User.id).all(),
[('ed',), ('jack',)])
eq_(
s.query(User.name).order_by(User.id).all(),
[('ed',), ('jack',)])
s.commit()
assert u1.name == 'ed'
assert u2.name == 'jack'
eq_(s.query(User.name).order_by(User.id).all(),
[('ed',), ('jack',)])
eq_(s.query(User.name).order_by(User.id).all(), [('ed',), ('jack',)])
@testing.requires.savepoints
def test_savepoint_delete(self):
@@ -1006,19 +1013,23 @@ class SavepointTest(_LocalFixture):
u1.name = 'edward'
u2.name = 'jackward'
s.add_all([u3, u4])
eq_(s.query(User.name).order_by(User.id).all(),
[('edward',), ('jackward',), ('wendy',), ('foo',)])
eq_(
s.query(User.name).order_by(User.id).all(),
[('edward',), ('jackward',), ('wendy',), ('foo',)])
s.commit()
def go():
assert u1.name == 'edward'
assert u2.name == 'jackward'
eq_(s.query(User.name).order_by(User.id).all(),
[('edward',), ('jackward',), ('wendy',), ('foo',)])
eq_(
s.query(User.name).order_by(User.id).all(),
[('edward',), ('jackward',), ('wendy',), ('foo',)])
self.assert_sql_count(testing.db, go, 1)
s.commit()
eq_(s.query(User.name).order_by(User.id).all(),
[('edward',), ('jackward',), ('wendy',), ('foo',)])
eq_(
s.query(User.name).order_by(User.id).all(),
[('edward',), ('jackward',), ('wendy',), ('foo',)])
@testing.requires.savepoints
def test_savepoint_rollback_collections(self):
@@ -1028,30 +1039,40 @@ class SavepointTest(_LocalFixture):
s.add(u1)
s.commit()
u1.name='edward'
u1.name = 'edward'
u1.addresses.append(Address(email_address='bar'))
s.begin_nested()
u2 = User(name='jack', addresses=[Address(email_address='bat')])
s.add(u2)
eq_(s.query(User).order_by(User.id).all(),
eq_(
s.query(User).order_by(User.id).all(),
[
User(name='edward', addresses=[Address(email_address='foo'),
Address(email_address='bar')]),
User(
name='edward',
addresses=[
Address(email_address='foo'),
Address(email_address='bar')]),
User(name='jack', addresses=[Address(email_address='bat')])
]
)
])
s.rollback()
eq_(s.query(User).order_by(User.id).all(),
eq_(
s.query(User).order_by(User.id).all(),
[
User(name='edward', addresses=[Address(email_address='foo'),
Address(email_address='bar')]),
]
)
User(
name='edward',
addresses=[
Address(email_address='foo'),
Address(email_address='bar')]),
])
s.commit()
eq_(s.query(User).order_by(User.id).all(),
eq_(
s.query(User).order_by(User.id).all(),
[
User(name='edward', addresses=[Address(email_address='foo'),
Address(email_address='bar')]),
User(
name='edward',
addresses=[
Address(email_address='foo'),
Address(email_address='bar')]),
]
)
@@ -1063,28 +1084,43 @@ class SavepointTest(_LocalFixture):
s.add(u1)
s.commit()
u1.name='edward'
u1.name = 'edward'
u1.addresses.append(Address(email_address='bar'))
s.begin_nested()
u2 = User(name='jack', addresses=[Address(email_address='bat')])
s.add(u2)
eq_(s.query(User).order_by(User.id).all(),
eq_(
s.query(User).order_by(User.id).all(),
[
User(name='edward', addresses=[Address(email_address='foo'), Address(email_address='bar')]),
User(
name='edward',
addresses=[
Address(email_address='foo'),
Address(email_address='bar')]),
User(name='jack', addresses=[Address(email_address='bat')])
]
)
s.commit()
eq_(s.query(User).order_by(User.id).all(),
eq_(
s.query(User).order_by(User.id).all(),
[
User(name='edward', addresses=[Address(email_address='foo'), Address(email_address='bar')]),
User(
name='edward',
addresses=[
Address(email_address='foo'),
Address(email_address='bar')]),
User(name='jack', addresses=[Address(email_address='bat')])
]
)
s.commit()
eq_(s.query(User).order_by(User.id).all(),
eq_(
s.query(User).order_by(User.id).all(),
[
User(name='edward', addresses=[Address(email_address='foo'), Address(email_address='bar')]),
User(
name='edward',
addresses=[
Address(email_address='foo'),
Address(email_address='bar')]),
User(name='jack', addresses=[Address(email_address='bat')])
]
)
@@ -1095,7 +1131,7 @@ class SavepointTest(_LocalFixture):
sess = self.session()
sess.begin_nested()
u2= User(name='newuser')
u2 = User(name='newuser')
sess.add(u2)
assert u2 in sess
sess.rollback()
@@ -1128,7 +1164,8 @@ class AccountingFlagsTest(_LocalFixture):
sess.add(u1)
sess.commit()
testing.db.execute(users.update(users.c.name=='ed').values(name='edward'))
testing.db.execute(
users.update(users.c.name == 'ed').values(name='edward'))
assert u1.name == 'ed'
sess.expire_all()
@@ -1145,7 +1182,8 @@ class AccountingFlagsTest(_LocalFixture):
u1.name = 'edwardo'
sess.rollback()
testing.db.execute(users.update(users.c.name=='ed').values(name='edward'))
testing.db.execute(
users.update(users.c.name == 'ed').values(name='edward'))
assert u1.name == 'edwardo'
sess.expire_all()
@@ -1162,12 +1200,14 @@ class AccountingFlagsTest(_LocalFixture):
u1.name = 'edwardo'
sess.rollback()
testing.db.execute(users.update(users.c.name=='ed').values(name='edward'))
testing.db.execute(
users.update(users.c.name == 'ed').values(name='edward'))
assert u1.name == 'edwardo'
sess.commit()
assert testing.db.execute(select([users.c.name])).fetchall() == [('edwardo',)]
assert testing.db.execute(select([users.c.name])).fetchall() == \
[('edwardo',)]
assert u1.name == 'edwardo'
sess.delete(u1)
@@ -1176,8 +1216,9 @@ class AccountingFlagsTest(_LocalFixture):
def test_preflush_no_accounting(self):
User, users = self.classes.User, self.tables.users
sess = Session(_enable_transaction_accounting=False,
autocommit=True, autoflush=False)
sess = Session(
_enable_transaction_accounting=False, autocommit=True,
autoflush=False)
u1 = User(name='ed')
sess.add(u1)
sess.flush()
@@ -1190,7 +1231,8 @@ class AccountingFlagsTest(_LocalFixture):
sess.rollback()
sess.begin()
assert testing.db.execute(select([users.c.name])).fetchall() == [('ed',)]
assert testing.db.execute(select([users.c.name])).fetchall() == \
[('ed',)]
class AutoCommitTest(_LocalFixture):
@@ -1220,6 +1262,7 @@ class AutoCommitTest(_LocalFixture):
sess = create_session(autocommit=True)
fail = False
def fail_fn(*arg, **kw):
if fail:
raise Exception("commit fails")
@@ -1251,6 +1294,7 @@ class AutoCommitTest(_LocalFixture):
sess = create_session(autocommit=True)
fail = False
def fail_fn(*arg, **kw):
if fail:
raise Exception("commit fails")
@@ -1296,14 +1340,13 @@ class AutoCommitTest(_LocalFixture):
assert 'id' not in u1.__dict__
eq_(u1.id, 3)
class NaturalPKRollbackTest(fixtures.MappedTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
Column('name', String(50), primary_key=True)
)
Table('users', metadata, Column('name', String(50), primary_key=True))
@classmethod
def setup_classes(cls):
@@ -1317,10 +1360,7 @@ class NaturalPKRollbackTest(fixtures.MappedTest):
session = sessionmaker()()
u1, u2, u3= \
User(name='u1'),\
User(name='u2'),\
User(name='u3')
u1, u2, u3 = User(name='u1'), User(name='u2'), User(name='u3')
session.add_all([u1, u2, u3])
@@ -1424,5 +1464,3 @@ class NaturalPKRollbackTest(fixtures.MappedTest):
assert u2 not in s
assert s.identity_map[(User, ('u1',))] is u1
+179 -148
View File
@@ -2,21 +2,22 @@ import datetime
import sqlalchemy as sa
from sqlalchemy.testing import engines
from sqlalchemy import testing
from sqlalchemy import Integer, String, Date, ForeignKey, literal_column, \
orm, exc, select, TypeDecorator
from sqlalchemy import (
Integer, String, Date, ForeignKey, orm, exc, select, TypeDecorator)
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, Session, \
create_session, column_property, sessionmaker,\
exc as orm_exc
from sqlalchemy.testing import eq_, ne_, assert_raises, assert_raises_message
from sqlalchemy.testing import fixtures
from test.orm import _fixtures
from sqlalchemy.testing.assertsql import AllOf, CompiledSQL
from sqlalchemy.orm import (
mapper, relationship, Session, create_session, sessionmaker,
exc as orm_exc)
from sqlalchemy.testing import (
eq_, assert_raises, assert_raises_message, fixtures)
from sqlalchemy.testing.assertsql import CompiledSQL
import uuid
def make_uuid():
return uuid.uuid4().hex
class VersioningTest(fixtures.MappedTest):
__backend__ = True
@@ -36,8 +37,7 @@ class VersioningTest(fixtures.MappedTest):
def _fixture(self):
Foo, version_table = self.classes.Foo, self.tables.version_table
mapper(Foo, version_table,
version_id_col=version_table.c.version_id)
mapper(Foo, version_table, version_id_col=version_table.c.version_id)
s1 = Session()
return s1
@@ -54,12 +54,13 @@ class VersioningTest(fixtures.MappedTest):
s1.add_all((f1, f2))
s1.commit()
f1.value='f1rev2'
f1.value = 'f1rev2'
assert_raises(sa.exc.SAWarning, s1.commit)
finally:
testing.db.dialect.supports_sane_rowcount = save
@testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support (update|delete)d rowcount')
def test_basic(self):
Foo = self.classes.Foo
@@ -69,23 +70,23 @@ class VersioningTest(fixtures.MappedTest):
s1.add_all((f1, f2))
s1.commit()
f1.value='f1rev2'
f1.value = 'f1rev2'
s1.commit()
s2 = create_session(autocommit=False)
f1_s = s2.query(Foo).get(f1.id)
f1_s.value='f1rev3'
f1_s.value = 'f1rev3'
s2.commit()
f1.value='f1rev3mine'
f1.value = 'f1rev3mine'
# Only dialects with a sane rowcount can detect the
# StaleDataError
if testing.db.dialect.supports_sane_rowcount:
assert_raises_message(sa.orm.exc.StaleDataError,
r"UPDATE statement on table 'version_table' expected "
r"to update 1 row\(s\); 0 were matched.",
s1.commit),
assert_raises_message(
sa.orm.exc.StaleDataError,
r"UPDATE statement on table 'version_table' expected "
r"to update 1 row\(s\); 0 were matched.", s1.commit),
s1.rollback()
else:
s1.commit()
@@ -94,7 +95,7 @@ class VersioningTest(fixtures.MappedTest):
f1 = s1.query(Foo).get(f1.id)
f2 = s1.query(Foo).get(f2.id)
f1_s.value='f1rev4'
f1_s.value = 'f1rev4'
s2.commit()
s1.delete(f1)
@@ -109,7 +110,8 @@ class VersioningTest(fixtures.MappedTest):
else:
s1.commit()
@testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support (update|delete)d rowcount')
def test_bump_version(self):
"""test that version number can be bumped.
@@ -145,11 +147,11 @@ class VersioningTest(fixtures.MappedTest):
@testing.emits_warning(r'.*does not support updated rowcount')
@engines.close_open_connections
def test_versioncheck(self):
"""query.with_lockmode performs a 'version check' on an already loaded instance"""
"""query.with_lockmode performs a 'version check' on an already loaded
instance"""
Foo = self.classes.Foo
s1 = self._fixture()
f1s1 = Foo(value='f1 value')
s1.add(f1s1)
@@ -157,16 +159,16 @@ class VersioningTest(fixtures.MappedTest):
s2 = create_session(autocommit=False)
f1s2 = s2.query(Foo).get(f1s1.id)
f1s2.value='f1 new value'
f1s2.value = 'f1 new value'
s2.commit()
# load, version is wrong
assert_raises_message(
sa.orm.exc.StaleDataError,
r"Instance .* has version id '\d+' which does not "
r"match database-loaded version id '\d+'",
s1.query(Foo).with_lockmode('read').get, f1s1.id
)
sa.orm.exc.StaleDataError,
r"Instance .* has version id '\d+' which does not "
r"match database-loaded version id '\d+'",
s1.query(Foo).with_lockmode('read').get, f1s1.id
)
# reload it - this expires the old version first
s1.refresh(f1s1, lockmode='read')
@@ -178,16 +180,15 @@ class VersioningTest(fixtures.MappedTest):
s1.close()
s1.query(Foo).with_lockmode('read').get(f1s1.id)
@testing.emits_warning(r'.*does not support updated rowcount')
@engines.close_open_connections
@testing.requires.update_nowait
def test_versioncheck_for_update(self):
"""query.with_lockmode performs a 'version check' on an already loaded instance"""
"""query.with_lockmode performs a 'version check' on an already loaded
instance"""
Foo = self.classes.Foo
s1 = self._fixture()
f1s1 = Foo(value='f1 value')
s1.add(f1s1)
@@ -196,7 +197,7 @@ class VersioningTest(fixtures.MappedTest):
s2 = create_session(autocommit=False)
f1s2 = s2.query(Foo).get(f1s1.id)
s2.refresh(f1s2, lockmode='update')
f1s2.value='f1 new value'
f1s2.value = 'f1 new value'
assert_raises(
exc.DBAPIError,
@@ -211,7 +212,8 @@ class VersioningTest(fixtures.MappedTest):
@testing.emits_warning(r'.*does not support updated rowcount')
@engines.close_open_connections
def test_noversioncheck(self):
"""test query.with_lockmode works when the mapper has no version id col"""
"""test query.with_lockmode works when the mapper has no version id
col"""
Foo, version_table = self.classes.Foo, self.tables.version_table
@@ -226,7 +228,8 @@ class VersioningTest(fixtures.MappedTest):
assert f1s2.id == f1s1.id
assert f1s2.value == f1s1.value
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support updated rowcount')
def test_merge_no_version(self):
Foo = self.classes.Foo
@@ -244,7 +247,8 @@ class VersioningTest(fixtures.MappedTest):
s1.commit()
eq_(f3.version_id, 3)
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support updated rowcount')
def test_merge_correct_version(self):
Foo = self.classes.Foo
@@ -262,7 +266,8 @@ class VersioningTest(fixtures.MappedTest):
s1.commit()
eq_(f3.version_id, 3)
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support updated rowcount')
def test_merge_incorrect_version(self):
Foo = self.classes.Foo
@@ -284,7 +289,8 @@ class VersioningTest(fixtures.MappedTest):
s1.merge, f2
)
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support updated rowcount')
def test_merge_incorrect_version_not_in_session(self):
Foo = self.classes.Foo
@@ -308,6 +314,7 @@ class VersioningTest(fixtures.MappedTest):
s1.merge, f2
)
class ColumnTypeTest(fixtures.MappedTest):
__backend__ = True
@@ -315,6 +322,7 @@ class ColumnTypeTest(fixtures.MappedTest):
def define_tables(cls, metadata):
class SpecialType(TypeDecorator):
impl = Date
def process_bind_param(self, value, dialect):
assert isinstance(value, datetime.date)
return value
@@ -332,8 +340,7 @@ class ColumnTypeTest(fixtures.MappedTest):
def _fixture(self):
Foo, version_table = self.classes.Foo, self.tables.version_table
mapper(Foo, version_table,
version_id_col=version_table.c.version_id)
mapper(Foo, version_table, version_id_col=version_table.c.version_id)
s1 = Session()
return s1
@@ -346,19 +353,23 @@ class ColumnTypeTest(fixtures.MappedTest):
s1.add(f1)
s1.commit()
f1.value='f1rev2'
f1.value = 'f1rev2'
s1.commit()
class RowSwitchTest(fixtures.MappedTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('p', metadata,
Table(
'p', metadata,
Column('id', String(10), primary_key=True),
Column('version_id', Integer, default=1, nullable=False),
Column('data', String(50))
)
Table('c', metadata,
Table(
'c', metadata,
Column('id', String(10), ForeignKey('p.id'), primary_key=True),
Column('version_id', Integer, default=1, nullable=False),
Column('data', String(50))
@@ -366,25 +377,25 @@ class RowSwitchTest(fixtures.MappedTest):
@classmethod
def setup_classes(cls):
class P(cls.Basic):
pass
class C(cls.Basic):
pass
@classmethod
def setup_mappers(cls):
p, c, C, P = (cls.tables.p,
cls.tables.c,
cls.classes.C,
cls.classes.P)
p, c, C, P = cls.tables.p, cls.tables.c, cls.classes.C, cls.classes.P
mapper(P, p, version_id_col=p.c.version_id,
properties={
'c':relationship(C, uselist=False, cascade='all, delete-orphan')
})
mapper(
P, p, version_id_col=p.c.version_id, properties={
'c': relationship(
C, uselist=False, cascade='all, delete-orphan')})
mapper(C, c, version_id_col=c.c.version_id)
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support updated rowcount')
def test_row_switch(self):
P = self.classes.P
@@ -398,7 +409,8 @@ class RowSwitchTest(fixtures.MappedTest):
session.add(P(id='P1', data="really a row-switch"))
session.commit()
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support updated rowcount')
def test_child_row_switch(self):
P, C = self.classes.P, self.classes.C
@@ -417,16 +429,20 @@ class RowSwitchTest(fixtures.MappedTest):
p.c = C(data='child row-switch')
session.commit()
class AlternateGeneratorTest(fixtures.MappedTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('p', metadata,
Table(
'p', metadata,
Column('id', String(10), primary_key=True),
Column('version_id', String(32), nullable=False),
Column('data', String(50))
)
Table('c', metadata,
Table(
'c', metadata,
Column('id', String(10), ForeignKey('p.id'), primary_key=True),
Column('version_id', String(32), nullable=False),
Column('data', String(50))
@@ -434,28 +450,31 @@ class AlternateGeneratorTest(fixtures.MappedTest):
@classmethod
def setup_classes(cls):
class P(cls.Basic):
pass
class C(cls.Basic):
pass
@classmethod
def setup_mappers(cls):
p, c, C, P = (cls.tables.p,
cls.tables.c,
cls.classes.C,
cls.classes.P)
p, c, C, P = cls.tables.p, cls.tables.c, cls.classes.C, cls.classes.P
mapper(P, p, version_id_col=p.c.version_id,
mapper(
P, p, version_id_col=p.c.version_id,
version_id_generator=lambda x: make_uuid(),
properties={
'c': relationship(C, uselist=False, cascade='all, delete-orphan')
})
mapper(C, c, version_id_col=c.c.version_id,
version_id_generator=lambda x: make_uuid(),
'c': relationship(
C, uselist=False, cascade='all, delete-orphan')
})
mapper(
C, c, version_id_col=c.c.version_id,
version_id_generator=lambda x: make_uuid(),
)
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support updated rowcount')
def test_row_switch(self):
P = self.classes.P
@@ -469,7 +488,8 @@ class AlternateGeneratorTest(fixtures.MappedTest):
session.add(P(id='P1', data="really a row-switch"))
session.commit()
@testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support (update|delete)d rowcount')
def test_child_row_switch_one(self):
P, C = self.classes.P, self.classes.C
@@ -488,7 +508,8 @@ class AlternateGeneratorTest(fixtures.MappedTest):
p.c = C(data='child row-switch')
session.commit()
@testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount')
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support (update|delete)d rowcount')
def test_child_row_switch_two(self):
P = self.classes.P
@@ -525,20 +546,26 @@ class AlternateGeneratorTest(fixtures.MappedTest):
else:
sess2.commit
class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
"""Test versioning where both parent/child table have a
versioning column.
"""
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('base', metadata,
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Table(
'base', metadata,
Column(
'id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('version_id', Integer, nullable=True),
Column('data', String(50))
)
Table('sub', metadata,
Table(
'sub', metadata,
Column('id', Integer, ForeignKey('base.id'), primary_key=True),
Column('version_id', Integer, nullable=False),
Column('sub_data', String(50))
@@ -546,19 +573,19 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
@classmethod
def setup_classes(cls):
class Base(cls.Basic):
pass
class Sub(Base):
pass
def test_base_both(self):
Base, sub, base, Sub = (self.classes.Base,
self.tables.sub,
self.tables.base,
self.classes.Sub)
Base, sub, base, Sub = (
self.classes.Base, self.tables.sub, self.tables.base,
self.classes.Sub)
mapper(Base, base,
version_id_col=base.c.version_id)
mapper(Base, base, version_id_col=base.c.version_id)
mapper(Sub, sub, inherits=Base)
session = Session()
@@ -570,13 +597,11 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
eq_(select([base.c.version_id]).scalar(), 1)
def test_sub_both(self):
Base, sub, base, Sub = (self.classes.Base,
self.tables.sub,
self.tables.base,
self.classes.Sub)
Base, sub, base, Sub = (
self.classes.Base, self.tables.sub, self.tables.base,
self.classes.Sub)
mapper(Base, base,
version_id_col=base.c.version_id)
mapper(Base, base, version_id_col=base.c.version_id)
mapper(Sub, sub, inherits=Base)
session = Session()
@@ -591,14 +616,12 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
eq_(select([base.c.version_id]).scalar(), 1)
def test_sub_only(self):
Base, sub, base, Sub = (self.classes.Base,
self.tables.sub,
self.tables.base,
self.classes.Sub)
Base, sub, base, Sub = (
self.classes.Base, self.tables.sub, self.tables.base,
self.classes.Sub)
mapper(Base, base)
mapper(Sub, sub, inherits=Base,
version_id_col=sub.c.version_id)
mapper(Sub, sub, inherits=Base, version_id_col=sub.c.version_id)
session = Session()
s1 = Sub(data='s1', sub_data='s1')
@@ -612,13 +635,11 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
eq_(select([base.c.version_id]).scalar(), None)
def test_mismatch_version_col_warning(self):
Base, sub, base, Sub = (self.classes.Base,
self.tables.sub,
self.tables.base,
self.classes.Sub)
Base, sub, base, Sub = (
self.classes.Base, self.tables.sub, self.tables.base,
self.classes.Sub)
mapper(Base, base,
version_id_col=base.c.version_id)
mapper(Base, base, version_id_col=base.c.version_id)
assert_raises_message(
exc.SAWarning,
@@ -627,9 +648,8 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
"automatically populate the inherited versioning column. "
"version_id_col should only be specified on "
"the base-most mapper that includes versioning.",
mapper,
Sub, sub, inherits=Base,
version_id_col=sub.c.version_id)
mapper, Sub, sub, inherits=Base,
version_id_col=sub.c.version_id)
class ServerVersioningTest(fixtures.MappedTest):
@@ -659,27 +679,32 @@ class ServerVersioningTest(fixtures.MappedTest):
stmt._counter = str(next(counter))
return stmt._counter
Table('version_table', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('version_id', Integer, nullable=False,
default=IncDefault(), onupdate=IncDefault()),
Column('value', String(40), nullable=False))
Table(
'version_table', metadata,
Column(
'id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column(
'version_id', Integer, nullable=False,
default=IncDefault(), onupdate=IncDefault()),
Column('value', String(40), nullable=False))
@classmethod
def setup_classes(cls):
class Foo(cls.Basic):
pass
class Bar(cls.Basic):
pass
def _fixture(self, expire_on_commit=True):
Foo, version_table = self.classes.Foo, self.tables.version_table
mapper(Foo, version_table,
version_id_col=version_table.c.version_id,
version_id_generator=False,
)
mapper(
Foo, version_table, version_id_col=version_table.c.version_id,
version_id_generator=False,
)
s1 = Session(expire_on_commit=expire_on_commit)
return s1
@@ -691,21 +716,22 @@ class ServerVersioningTest(fixtures.MappedTest):
sess.add(f1)
statements = [
# note that the assertsql tests the rule against
# "default" - on a "returning" backend, the statement
# includes "RETURNING"
CompiledSQL(
"INSERT INTO version_table (version_id, value) "
"VALUES (1, :value)",
lambda ctx: [{'value': 'f1'}]
)
# note that the assertsql tests the rule against
# "default" - on a "returning" backend, the statement
# includes "RETURNING"
CompiledSQL(
"INSERT INTO version_table (version_id, value) "
"VALUES (1, :value)",
lambda ctx: [{'value': 'f1'}]
)
]
if not testing.db.dialect.implicit_returning:
# DBs without implicit returning, we must immediately
# SELECT for the new version id
statements.append(
CompiledSQL(
"SELECT version_table.version_id AS version_table_version_id "
"SELECT version_table.version_id "
" AS version_table_version_id "
"FROM version_table WHERE version_table.id = :param_1",
lambda ctx: [{"param_1": 1}]
)
@@ -722,30 +748,32 @@ class ServerVersioningTest(fixtures.MappedTest):
f1.value = 'f2'
statements = [
# note that the assertsql tests the rule against
# "default" - on a "returning" backend, the statement
# includes "RETURNING"
CompiledSQL(
"UPDATE version_table SET version_id=2, value=:value "
"WHERE version_table.id = :version_table_id AND "
"version_table.version_id = :version_table_version_id",
lambda ctx: [{"version_table_id": 1,
"version_table_version_id": 1, "value": "f2"}]
)
# note that the assertsql tests the rule against
# "default" - on a "returning" backend, the statement
# includes "RETURNING"
CompiledSQL(
"UPDATE version_table SET version_id=2, value=:value "
"WHERE version_table.id = :version_table_id AND "
"version_table.version_id = :version_table_version_id",
lambda ctx: [
{
"version_table_id": 1,
"version_table_version_id": 1, "value": "f2"}]
)
]
if not testing.db.dialect.implicit_returning:
# DBs without implicit returning, we must immediately
# SELECT for the new version id
statements.append(
CompiledSQL(
"SELECT version_table.version_id AS version_table_version_id "
"SELECT version_table.version_id "
" AS version_table_version_id "
"FROM version_table WHERE version_table.id = :param_1",
lambda ctx: [{"param_1": 1}]
)
)
self.assert_sql_execution(testing.db, sess.flush, *statements)
def test_delete_col(self):
sess = self._fixture()
@@ -756,15 +784,15 @@ class ServerVersioningTest(fixtures.MappedTest):
sess.delete(f1)
statements = [
# note that the assertsql tests the rule against
# "default" - on a "returning" backend, the statement
# includes "RETURNING"
CompiledSQL(
"DELETE FROM version_table "
"WHERE version_table.id = :id AND "
"version_table.version_id = :version_id",
lambda ctx: [{"id": 1, "version_id": 1}]
)
# note that the assertsql tests the rule against
# "default" - on a "returning" backend, the statement
# includes "RETURNING"
CompiledSQL(
"DELETE FROM version_table "
"WHERE version_table.id = :id AND "
"version_table.version_id = :version_id",
lambda ctx: [{"id": 1, "version_id": 1}]
)
]
self.assert_sql_execution(testing.db, sess.flush, *statements)
@@ -817,29 +845,32 @@ class ServerVersioningTest(fixtures.MappedTest):
sess.commit
)
class ManualVersionTest(fixtures.MappedTest):
run_define_tables = 'each'
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table("a", metadata,
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)),
Column('vid', Integer)
)
Table(
"a", metadata,
Column(
'id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('data', String(30)),
Column('vid', Integer)
)
@classmethod
def setup_classes(cls):
class A(cls.Basic):
pass
@classmethod
def setup_mappers(cls):
mapper(cls.classes.A, cls.tables.a,
version_id_col=cls.tables.a.c.vid,
version_id_generator=False)
mapper(
cls.classes.A, cls.tables.a, version_id_col=cls.tables.a.c.vid,
version_id_generator=False)
def test_insert(self):
sess = Session()
@@ -904,4 +935,4 @@ class ManualVersionTest(fixtures.MappedTest):
a1.vid = 2
sess.commit()
eq_(a1.vid, 2)
eq_(a1.vid, 2)