mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-29 12:06:28 -04:00
PEP8 tidy for test/orm/test_versioning.py
This commit is contained in:
+179
-148
@@ -2,21 +2,22 @@ import datetime
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.testing import engines
|
||||
from sqlalchemy import testing
|
||||
from sqlalchemy import Integer, String, Date, ForeignKey, literal_column, \
|
||||
orm, exc, select, TypeDecorator
|
||||
from sqlalchemy import (
|
||||
Integer, String, Date, ForeignKey, orm, exc, select, TypeDecorator)
|
||||
from sqlalchemy.testing.schema import Table, Column
|
||||
from sqlalchemy.orm import mapper, relationship, Session, \
|
||||
create_session, column_property, sessionmaker,\
|
||||
exc as orm_exc
|
||||
from sqlalchemy.testing import eq_, ne_, assert_raises, assert_raises_message
|
||||
from sqlalchemy.testing import fixtures
|
||||
from test.orm import _fixtures
|
||||
from sqlalchemy.testing.assertsql import AllOf, CompiledSQL
|
||||
from sqlalchemy.orm import (
|
||||
mapper, relationship, Session, create_session, sessionmaker,
|
||||
exc as orm_exc)
|
||||
from sqlalchemy.testing import (
|
||||
eq_, assert_raises, assert_raises_message, fixtures)
|
||||
from sqlalchemy.testing.assertsql import CompiledSQL
|
||||
import uuid
|
||||
|
||||
|
||||
def make_uuid():
|
||||
return uuid.uuid4().hex
|
||||
|
||||
|
||||
class VersioningTest(fixtures.MappedTest):
|
||||
__backend__ = True
|
||||
|
||||
@@ -36,8 +37,7 @@ class VersioningTest(fixtures.MappedTest):
|
||||
def _fixture(self):
|
||||
Foo, version_table = self.classes.Foo, self.tables.version_table
|
||||
|
||||
mapper(Foo, version_table,
|
||||
version_id_col=version_table.c.version_id)
|
||||
mapper(Foo, version_table, version_id_col=version_table.c.version_id)
|
||||
s1 = Session()
|
||||
return s1
|
||||
|
||||
@@ -54,12 +54,13 @@ class VersioningTest(fixtures.MappedTest):
|
||||
s1.add_all((f1, f2))
|
||||
s1.commit()
|
||||
|
||||
f1.value='f1rev2'
|
||||
f1.value = 'f1rev2'
|
||||
assert_raises(sa.exc.SAWarning, s1.commit)
|
||||
finally:
|
||||
testing.db.dialect.supports_sane_rowcount = save
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support (update|delete)d rowcount')
|
||||
def test_basic(self):
|
||||
Foo = self.classes.Foo
|
||||
|
||||
@@ -69,23 +70,23 @@ class VersioningTest(fixtures.MappedTest):
|
||||
s1.add_all((f1, f2))
|
||||
s1.commit()
|
||||
|
||||
f1.value='f1rev2'
|
||||
f1.value = 'f1rev2'
|
||||
s1.commit()
|
||||
|
||||
s2 = create_session(autocommit=False)
|
||||
f1_s = s2.query(Foo).get(f1.id)
|
||||
f1_s.value='f1rev3'
|
||||
f1_s.value = 'f1rev3'
|
||||
s2.commit()
|
||||
|
||||
f1.value='f1rev3mine'
|
||||
f1.value = 'f1rev3mine'
|
||||
|
||||
# Only dialects with a sane rowcount can detect the
|
||||
# StaleDataError
|
||||
if testing.db.dialect.supports_sane_rowcount:
|
||||
assert_raises_message(sa.orm.exc.StaleDataError,
|
||||
r"UPDATE statement on table 'version_table' expected "
|
||||
r"to update 1 row\(s\); 0 were matched.",
|
||||
s1.commit),
|
||||
assert_raises_message(
|
||||
sa.orm.exc.StaleDataError,
|
||||
r"UPDATE statement on table 'version_table' expected "
|
||||
r"to update 1 row\(s\); 0 were matched.", s1.commit),
|
||||
s1.rollback()
|
||||
else:
|
||||
s1.commit()
|
||||
@@ -94,7 +95,7 @@ class VersioningTest(fixtures.MappedTest):
|
||||
f1 = s1.query(Foo).get(f1.id)
|
||||
f2 = s1.query(Foo).get(f2.id)
|
||||
|
||||
f1_s.value='f1rev4'
|
||||
f1_s.value = 'f1rev4'
|
||||
s2.commit()
|
||||
|
||||
s1.delete(f1)
|
||||
@@ -109,7 +110,8 @@ class VersioningTest(fixtures.MappedTest):
|
||||
else:
|
||||
s1.commit()
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support (update|delete)d rowcount')
|
||||
def test_bump_version(self):
|
||||
"""test that version number can be bumped.
|
||||
|
||||
@@ -145,11 +147,11 @@ class VersioningTest(fixtures.MappedTest):
|
||||
@testing.emits_warning(r'.*does not support updated rowcount')
|
||||
@engines.close_open_connections
|
||||
def test_versioncheck(self):
|
||||
"""query.with_lockmode performs a 'version check' on an already loaded instance"""
|
||||
"""query.with_lockmode performs a 'version check' on an already loaded
|
||||
instance"""
|
||||
|
||||
Foo = self.classes.Foo
|
||||
|
||||
|
||||
s1 = self._fixture()
|
||||
f1s1 = Foo(value='f1 value')
|
||||
s1.add(f1s1)
|
||||
@@ -157,16 +159,16 @@ class VersioningTest(fixtures.MappedTest):
|
||||
|
||||
s2 = create_session(autocommit=False)
|
||||
f1s2 = s2.query(Foo).get(f1s1.id)
|
||||
f1s2.value='f1 new value'
|
||||
f1s2.value = 'f1 new value'
|
||||
s2.commit()
|
||||
|
||||
# load, version is wrong
|
||||
assert_raises_message(
|
||||
sa.orm.exc.StaleDataError,
|
||||
r"Instance .* has version id '\d+' which does not "
|
||||
r"match database-loaded version id '\d+'",
|
||||
s1.query(Foo).with_lockmode('read').get, f1s1.id
|
||||
)
|
||||
sa.orm.exc.StaleDataError,
|
||||
r"Instance .* has version id '\d+' which does not "
|
||||
r"match database-loaded version id '\d+'",
|
||||
s1.query(Foo).with_lockmode('read').get, f1s1.id
|
||||
)
|
||||
|
||||
# reload it - this expires the old version first
|
||||
s1.refresh(f1s1, lockmode='read')
|
||||
@@ -178,16 +180,15 @@ class VersioningTest(fixtures.MappedTest):
|
||||
s1.close()
|
||||
s1.query(Foo).with_lockmode('read').get(f1s1.id)
|
||||
|
||||
|
||||
@testing.emits_warning(r'.*does not support updated rowcount')
|
||||
@engines.close_open_connections
|
||||
@testing.requires.update_nowait
|
||||
def test_versioncheck_for_update(self):
|
||||
"""query.with_lockmode performs a 'version check' on an already loaded instance"""
|
||||
"""query.with_lockmode performs a 'version check' on an already loaded
|
||||
instance"""
|
||||
|
||||
Foo = self.classes.Foo
|
||||
|
||||
|
||||
s1 = self._fixture()
|
||||
f1s1 = Foo(value='f1 value')
|
||||
s1.add(f1s1)
|
||||
@@ -196,7 +197,7 @@ class VersioningTest(fixtures.MappedTest):
|
||||
s2 = create_session(autocommit=False)
|
||||
f1s2 = s2.query(Foo).get(f1s1.id)
|
||||
s2.refresh(f1s2, lockmode='update')
|
||||
f1s2.value='f1 new value'
|
||||
f1s2.value = 'f1 new value'
|
||||
|
||||
assert_raises(
|
||||
exc.DBAPIError,
|
||||
@@ -211,7 +212,8 @@ class VersioningTest(fixtures.MappedTest):
|
||||
@testing.emits_warning(r'.*does not support updated rowcount')
|
||||
@engines.close_open_connections
|
||||
def test_noversioncheck(self):
|
||||
"""test query.with_lockmode works when the mapper has no version id col"""
|
||||
"""test query.with_lockmode works when the mapper has no version id
|
||||
col"""
|
||||
|
||||
Foo, version_table = self.classes.Foo, self.tables.version_table
|
||||
|
||||
@@ -226,7 +228,8 @@ class VersioningTest(fixtures.MappedTest):
|
||||
assert f1s2.id == f1s1.id
|
||||
assert f1s2.value == f1s1.value
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support updated rowcount')
|
||||
def test_merge_no_version(self):
|
||||
Foo = self.classes.Foo
|
||||
|
||||
@@ -244,7 +247,8 @@ class VersioningTest(fixtures.MappedTest):
|
||||
s1.commit()
|
||||
eq_(f3.version_id, 3)
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support updated rowcount')
|
||||
def test_merge_correct_version(self):
|
||||
Foo = self.classes.Foo
|
||||
|
||||
@@ -262,7 +266,8 @@ class VersioningTest(fixtures.MappedTest):
|
||||
s1.commit()
|
||||
eq_(f3.version_id, 3)
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support updated rowcount')
|
||||
def test_merge_incorrect_version(self):
|
||||
Foo = self.classes.Foo
|
||||
|
||||
@@ -284,7 +289,8 @@ class VersioningTest(fixtures.MappedTest):
|
||||
s1.merge, f2
|
||||
)
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support updated rowcount')
|
||||
def test_merge_incorrect_version_not_in_session(self):
|
||||
Foo = self.classes.Foo
|
||||
|
||||
@@ -308,6 +314,7 @@ class VersioningTest(fixtures.MappedTest):
|
||||
s1.merge, f2
|
||||
)
|
||||
|
||||
|
||||
class ColumnTypeTest(fixtures.MappedTest):
|
||||
__backend__ = True
|
||||
|
||||
@@ -315,6 +322,7 @@ class ColumnTypeTest(fixtures.MappedTest):
|
||||
def define_tables(cls, metadata):
|
||||
class SpecialType(TypeDecorator):
|
||||
impl = Date
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
assert isinstance(value, datetime.date)
|
||||
return value
|
||||
@@ -332,8 +340,7 @@ class ColumnTypeTest(fixtures.MappedTest):
|
||||
def _fixture(self):
|
||||
Foo, version_table = self.classes.Foo, self.tables.version_table
|
||||
|
||||
mapper(Foo, version_table,
|
||||
version_id_col=version_table.c.version_id)
|
||||
mapper(Foo, version_table, version_id_col=version_table.c.version_id)
|
||||
s1 = Session()
|
||||
return s1
|
||||
|
||||
@@ -346,19 +353,23 @@ class ColumnTypeTest(fixtures.MappedTest):
|
||||
s1.add(f1)
|
||||
s1.commit()
|
||||
|
||||
f1.value='f1rev2'
|
||||
f1.value = 'f1rev2'
|
||||
s1.commit()
|
||||
|
||||
|
||||
class RowSwitchTest(fixtures.MappedTest):
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table('p', metadata,
|
||||
Table(
|
||||
'p', metadata,
|
||||
Column('id', String(10), primary_key=True),
|
||||
Column('version_id', Integer, default=1, nullable=False),
|
||||
Column('data', String(50))
|
||||
)
|
||||
Table('c', metadata,
|
||||
Table(
|
||||
'c', metadata,
|
||||
Column('id', String(10), ForeignKey('p.id'), primary_key=True),
|
||||
Column('version_id', Integer, default=1, nullable=False),
|
||||
Column('data', String(50))
|
||||
@@ -366,25 +377,25 @@ class RowSwitchTest(fixtures.MappedTest):
|
||||
|
||||
@classmethod
|
||||
def setup_classes(cls):
|
||||
|
||||
class P(cls.Basic):
|
||||
pass
|
||||
|
||||
class C(cls.Basic):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def setup_mappers(cls):
|
||||
p, c, C, P = (cls.tables.p,
|
||||
cls.tables.c,
|
||||
cls.classes.C,
|
||||
cls.classes.P)
|
||||
p, c, C, P = cls.tables.p, cls.tables.c, cls.classes.C, cls.classes.P
|
||||
|
||||
mapper(P, p, version_id_col=p.c.version_id,
|
||||
properties={
|
||||
'c':relationship(C, uselist=False, cascade='all, delete-orphan')
|
||||
})
|
||||
mapper(
|
||||
P, p, version_id_col=p.c.version_id, properties={
|
||||
'c': relationship(
|
||||
C, uselist=False, cascade='all, delete-orphan')})
|
||||
mapper(C, c, version_id_col=c.c.version_id)
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support updated rowcount')
|
||||
def test_row_switch(self):
|
||||
P = self.classes.P
|
||||
|
||||
@@ -398,7 +409,8 @@ class RowSwitchTest(fixtures.MappedTest):
|
||||
session.add(P(id='P1', data="really a row-switch"))
|
||||
session.commit()
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support updated rowcount')
|
||||
def test_child_row_switch(self):
|
||||
P, C = self.classes.P, self.classes.C
|
||||
|
||||
@@ -417,16 +429,20 @@ class RowSwitchTest(fixtures.MappedTest):
|
||||
p.c = C(data='child row-switch')
|
||||
session.commit()
|
||||
|
||||
|
||||
class AlternateGeneratorTest(fixtures.MappedTest):
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table('p', metadata,
|
||||
Table(
|
||||
'p', metadata,
|
||||
Column('id', String(10), primary_key=True),
|
||||
Column('version_id', String(32), nullable=False),
|
||||
Column('data', String(50))
|
||||
)
|
||||
Table('c', metadata,
|
||||
Table(
|
||||
'c', metadata,
|
||||
Column('id', String(10), ForeignKey('p.id'), primary_key=True),
|
||||
Column('version_id', String(32), nullable=False),
|
||||
Column('data', String(50))
|
||||
@@ -434,28 +450,31 @@ class AlternateGeneratorTest(fixtures.MappedTest):
|
||||
|
||||
@classmethod
|
||||
def setup_classes(cls):
|
||||
|
||||
class P(cls.Basic):
|
||||
pass
|
||||
|
||||
class C(cls.Basic):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def setup_mappers(cls):
|
||||
p, c, C, P = (cls.tables.p,
|
||||
cls.tables.c,
|
||||
cls.classes.C,
|
||||
cls.classes.P)
|
||||
p, c, C, P = cls.tables.p, cls.tables.c, cls.classes.C, cls.classes.P
|
||||
|
||||
mapper(P, p, version_id_col=p.c.version_id,
|
||||
mapper(
|
||||
P, p, version_id_col=p.c.version_id,
|
||||
version_id_generator=lambda x: make_uuid(),
|
||||
properties={
|
||||
'c': relationship(C, uselist=False, cascade='all, delete-orphan')
|
||||
})
|
||||
mapper(C, c, version_id_col=c.c.version_id,
|
||||
version_id_generator=lambda x: make_uuid(),
|
||||
'c': relationship(
|
||||
C, uselist=False, cascade='all, delete-orphan')
|
||||
})
|
||||
mapper(
|
||||
C, c, version_id_col=c.c.version_id,
|
||||
version_id_generator=lambda x: make_uuid(),
|
||||
)
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support updated rowcount')
|
||||
def test_row_switch(self):
|
||||
P = self.classes.P
|
||||
|
||||
@@ -469,7 +488,8 @@ class AlternateGeneratorTest(fixtures.MappedTest):
|
||||
session.add(P(id='P1', data="really a row-switch"))
|
||||
session.commit()
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support (update|delete)d rowcount')
|
||||
def test_child_row_switch_one(self):
|
||||
P, C = self.classes.P, self.classes.C
|
||||
|
||||
@@ -488,7 +508,8 @@ class AlternateGeneratorTest(fixtures.MappedTest):
|
||||
p.c = C(data='child row-switch')
|
||||
session.commit()
|
||||
|
||||
@testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount')
|
||||
@testing.emits_warning_on(
|
||||
'+zxjdbc', r'.*does not support (update|delete)d rowcount')
|
||||
def test_child_row_switch_two(self):
|
||||
P = self.classes.P
|
||||
|
||||
@@ -525,20 +546,26 @@ class AlternateGeneratorTest(fixtures.MappedTest):
|
||||
else:
|
||||
sess2.commit
|
||||
|
||||
|
||||
class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
|
||||
"""Test versioning where both parent/child table have a
|
||||
versioning column.
|
||||
|
||||
"""
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table('base', metadata,
|
||||
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
|
||||
Table(
|
||||
'base', metadata,
|
||||
Column(
|
||||
'id', Integer, primary_key=True,
|
||||
test_needs_autoincrement=True),
|
||||
Column('version_id', Integer, nullable=True),
|
||||
Column('data', String(50))
|
||||
)
|
||||
Table('sub', metadata,
|
||||
Table(
|
||||
'sub', metadata,
|
||||
Column('id', Integer, ForeignKey('base.id'), primary_key=True),
|
||||
Column('version_id', Integer, nullable=False),
|
||||
Column('sub_data', String(50))
|
||||
@@ -546,19 +573,19 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
|
||||
|
||||
@classmethod
|
||||
def setup_classes(cls):
|
||||
|
||||
class Base(cls.Basic):
|
||||
pass
|
||||
|
||||
class Sub(Base):
|
||||
pass
|
||||
|
||||
def test_base_both(self):
|
||||
Base, sub, base, Sub = (self.classes.Base,
|
||||
self.tables.sub,
|
||||
self.tables.base,
|
||||
self.classes.Sub)
|
||||
Base, sub, base, Sub = (
|
||||
self.classes.Base, self.tables.sub, self.tables.base,
|
||||
self.classes.Sub)
|
||||
|
||||
mapper(Base, base,
|
||||
version_id_col=base.c.version_id)
|
||||
mapper(Base, base, version_id_col=base.c.version_id)
|
||||
mapper(Sub, sub, inherits=Base)
|
||||
|
||||
session = Session()
|
||||
@@ -570,13 +597,11 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
|
||||
eq_(select([base.c.version_id]).scalar(), 1)
|
||||
|
||||
def test_sub_both(self):
|
||||
Base, sub, base, Sub = (self.classes.Base,
|
||||
self.tables.sub,
|
||||
self.tables.base,
|
||||
self.classes.Sub)
|
||||
Base, sub, base, Sub = (
|
||||
self.classes.Base, self.tables.sub, self.tables.base,
|
||||
self.classes.Sub)
|
||||
|
||||
mapper(Base, base,
|
||||
version_id_col=base.c.version_id)
|
||||
mapper(Base, base, version_id_col=base.c.version_id)
|
||||
mapper(Sub, sub, inherits=Base)
|
||||
|
||||
session = Session()
|
||||
@@ -591,14 +616,12 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
|
||||
eq_(select([base.c.version_id]).scalar(), 1)
|
||||
|
||||
def test_sub_only(self):
|
||||
Base, sub, base, Sub = (self.classes.Base,
|
||||
self.tables.sub,
|
||||
self.tables.base,
|
||||
self.classes.Sub)
|
||||
Base, sub, base, Sub = (
|
||||
self.classes.Base, self.tables.sub, self.tables.base,
|
||||
self.classes.Sub)
|
||||
|
||||
mapper(Base, base)
|
||||
mapper(Sub, sub, inherits=Base,
|
||||
version_id_col=sub.c.version_id)
|
||||
mapper(Sub, sub, inherits=Base, version_id_col=sub.c.version_id)
|
||||
|
||||
session = Session()
|
||||
s1 = Sub(data='s1', sub_data='s1')
|
||||
@@ -612,13 +635,11 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
|
||||
eq_(select([base.c.version_id]).scalar(), None)
|
||||
|
||||
def test_mismatch_version_col_warning(self):
|
||||
Base, sub, base, Sub = (self.classes.Base,
|
||||
self.tables.sub,
|
||||
self.tables.base,
|
||||
self.classes.Sub)
|
||||
Base, sub, base, Sub = (
|
||||
self.classes.Base, self.tables.sub, self.tables.base,
|
||||
self.classes.Sub)
|
||||
|
||||
mapper(Base, base,
|
||||
version_id_col=base.c.version_id)
|
||||
mapper(Base, base, version_id_col=base.c.version_id)
|
||||
|
||||
assert_raises_message(
|
||||
exc.SAWarning,
|
||||
@@ -627,9 +648,8 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest):
|
||||
"automatically populate the inherited versioning column. "
|
||||
"version_id_col should only be specified on "
|
||||
"the base-most mapper that includes versioning.",
|
||||
mapper,
|
||||
Sub, sub, inherits=Base,
|
||||
version_id_col=sub.c.version_id)
|
||||
mapper, Sub, sub, inherits=Base,
|
||||
version_id_col=sub.c.version_id)
|
||||
|
||||
|
||||
class ServerVersioningTest(fixtures.MappedTest):
|
||||
@@ -659,27 +679,32 @@ class ServerVersioningTest(fixtures.MappedTest):
|
||||
stmt._counter = str(next(counter))
|
||||
return stmt._counter
|
||||
|
||||
Table('version_table', metadata,
|
||||
Column('id', Integer, primary_key=True,
|
||||
test_needs_autoincrement=True),
|
||||
Column('version_id', Integer, nullable=False,
|
||||
default=IncDefault(), onupdate=IncDefault()),
|
||||
Column('value', String(40), nullable=False))
|
||||
Table(
|
||||
'version_table', metadata,
|
||||
Column(
|
||||
'id', Integer, primary_key=True,
|
||||
test_needs_autoincrement=True),
|
||||
Column(
|
||||
'version_id', Integer, nullable=False,
|
||||
default=IncDefault(), onupdate=IncDefault()),
|
||||
Column('value', String(40), nullable=False))
|
||||
|
||||
@classmethod
|
||||
def setup_classes(cls):
|
||||
|
||||
class Foo(cls.Basic):
|
||||
pass
|
||||
|
||||
class Bar(cls.Basic):
|
||||
pass
|
||||
|
||||
def _fixture(self, expire_on_commit=True):
|
||||
Foo, version_table = self.classes.Foo, self.tables.version_table
|
||||
|
||||
mapper(Foo, version_table,
|
||||
version_id_col=version_table.c.version_id,
|
||||
version_id_generator=False,
|
||||
)
|
||||
mapper(
|
||||
Foo, version_table, version_id_col=version_table.c.version_id,
|
||||
version_id_generator=False,
|
||||
)
|
||||
|
||||
s1 = Session(expire_on_commit=expire_on_commit)
|
||||
return s1
|
||||
@@ -691,21 +716,22 @@ class ServerVersioningTest(fixtures.MappedTest):
|
||||
sess.add(f1)
|
||||
|
||||
statements = [
|
||||
# note that the assertsql tests the rule against
|
||||
# "default" - on a "returning" backend, the statement
|
||||
# includes "RETURNING"
|
||||
CompiledSQL(
|
||||
"INSERT INTO version_table (version_id, value) "
|
||||
"VALUES (1, :value)",
|
||||
lambda ctx: [{'value': 'f1'}]
|
||||
)
|
||||
# note that the assertsql tests the rule against
|
||||
# "default" - on a "returning" backend, the statement
|
||||
# includes "RETURNING"
|
||||
CompiledSQL(
|
||||
"INSERT INTO version_table (version_id, value) "
|
||||
"VALUES (1, :value)",
|
||||
lambda ctx: [{'value': 'f1'}]
|
||||
)
|
||||
]
|
||||
if not testing.db.dialect.implicit_returning:
|
||||
# DBs without implicit returning, we must immediately
|
||||
# SELECT for the new version id
|
||||
statements.append(
|
||||
CompiledSQL(
|
||||
"SELECT version_table.version_id AS version_table_version_id "
|
||||
"SELECT version_table.version_id "
|
||||
" AS version_table_version_id "
|
||||
"FROM version_table WHERE version_table.id = :param_1",
|
||||
lambda ctx: [{"param_1": 1}]
|
||||
)
|
||||
@@ -722,30 +748,32 @@ class ServerVersioningTest(fixtures.MappedTest):
|
||||
f1.value = 'f2'
|
||||
|
||||
statements = [
|
||||
# note that the assertsql tests the rule against
|
||||
# "default" - on a "returning" backend, the statement
|
||||
# includes "RETURNING"
|
||||
CompiledSQL(
|
||||
"UPDATE version_table SET version_id=2, value=:value "
|
||||
"WHERE version_table.id = :version_table_id AND "
|
||||
"version_table.version_id = :version_table_version_id",
|
||||
lambda ctx: [{"version_table_id": 1,
|
||||
"version_table_version_id": 1, "value": "f2"}]
|
||||
)
|
||||
# note that the assertsql tests the rule against
|
||||
# "default" - on a "returning" backend, the statement
|
||||
# includes "RETURNING"
|
||||
CompiledSQL(
|
||||
"UPDATE version_table SET version_id=2, value=:value "
|
||||
"WHERE version_table.id = :version_table_id AND "
|
||||
"version_table.version_id = :version_table_version_id",
|
||||
lambda ctx: [
|
||||
{
|
||||
"version_table_id": 1,
|
||||
"version_table_version_id": 1, "value": "f2"}]
|
||||
)
|
||||
]
|
||||
if not testing.db.dialect.implicit_returning:
|
||||
# DBs without implicit returning, we must immediately
|
||||
# SELECT for the new version id
|
||||
statements.append(
|
||||
CompiledSQL(
|
||||
"SELECT version_table.version_id AS version_table_version_id "
|
||||
"SELECT version_table.version_id "
|
||||
" AS version_table_version_id "
|
||||
"FROM version_table WHERE version_table.id = :param_1",
|
||||
lambda ctx: [{"param_1": 1}]
|
||||
)
|
||||
)
|
||||
self.assert_sql_execution(testing.db, sess.flush, *statements)
|
||||
|
||||
|
||||
def test_delete_col(self):
|
||||
sess = self._fixture()
|
||||
|
||||
@@ -756,15 +784,15 @@ class ServerVersioningTest(fixtures.MappedTest):
|
||||
sess.delete(f1)
|
||||
|
||||
statements = [
|
||||
# note that the assertsql tests the rule against
|
||||
# "default" - on a "returning" backend, the statement
|
||||
# includes "RETURNING"
|
||||
CompiledSQL(
|
||||
"DELETE FROM version_table "
|
||||
"WHERE version_table.id = :id AND "
|
||||
"version_table.version_id = :version_id",
|
||||
lambda ctx: [{"id": 1, "version_id": 1}]
|
||||
)
|
||||
# note that the assertsql tests the rule against
|
||||
# "default" - on a "returning" backend, the statement
|
||||
# includes "RETURNING"
|
||||
CompiledSQL(
|
||||
"DELETE FROM version_table "
|
||||
"WHERE version_table.id = :id AND "
|
||||
"version_table.version_id = :version_id",
|
||||
lambda ctx: [{"id": 1, "version_id": 1}]
|
||||
)
|
||||
]
|
||||
self.assert_sql_execution(testing.db, sess.flush, *statements)
|
||||
|
||||
@@ -817,29 +845,32 @@ class ServerVersioningTest(fixtures.MappedTest):
|
||||
sess.commit
|
||||
)
|
||||
|
||||
|
||||
class ManualVersionTest(fixtures.MappedTest):
|
||||
run_define_tables = 'each'
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table("a", metadata,
|
||||
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
|
||||
Column('data', String(30)),
|
||||
Column('vid', Integer)
|
||||
)
|
||||
Table(
|
||||
"a", metadata,
|
||||
Column(
|
||||
'id', Integer, primary_key=True,
|
||||
test_needs_autoincrement=True),
|
||||
Column('data', String(30)),
|
||||
Column('vid', Integer)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def setup_classes(cls):
|
||||
class A(cls.Basic):
|
||||
pass
|
||||
|
||||
|
||||
@classmethod
|
||||
def setup_mappers(cls):
|
||||
mapper(cls.classes.A, cls.tables.a,
|
||||
version_id_col=cls.tables.a.c.vid,
|
||||
version_id_generator=False)
|
||||
mapper(
|
||||
cls.classes.A, cls.tables.a, version_id_col=cls.tables.a.c.vid,
|
||||
version_id_generator=False)
|
||||
|
||||
def test_insert(self):
|
||||
sess = Session()
|
||||
@@ -904,4 +935,4 @@ class ManualVersionTest(fixtures.MappedTest):
|
||||
a1.vid = 2
|
||||
sess.commit()
|
||||
|
||||
eq_(a1.vid, 2)
|
||||
eq_(a1.vid, 2)
|
||||
|
||||
Reference in New Issue
Block a user