- The "auto-attach" feature of constraints such as :class:.UniqueConstraint

and :class:`.CheckConstraint` has been further enhanced such that
when the constraint is associated with non-table-bound :class:`.Column`
objects, the constraint will set up event listeners with the
columns themselves such that the constraint auto attaches at the
same time the columns are associated with the table.  This in particular
helps in some edge cases in declarative but is also of general use.
fixes #3341
This commit is contained in:
Mike Bayer
2015-03-24 10:55:29 -04:00
parent 04545727d4
commit bdcaa0f6ca
4 changed files with 181 additions and 17 deletions
+12
View File
@@ -18,6 +18,18 @@
.. changelog::
:version: 1.0.0b4
.. change::
:tags: feature, schema
:tickets: 3341
The "auto-attach" feature of constraints such as :class:`.UniqueConstraint`
and :class:`.CheckConstraint` has been further enhanced such that
when the constraint is associated with non-table-bound :class:`.Column`
objects, the constraint will set up event listeners with the
columns themselves such that the constraint auto attaches at the
same time the columns are associated with the table. This in particular
helps in some edge cases in declarative but is also of general use.
.. change::
:tags: bug, sql
:tickets: 3340
+37 -17
View File
@@ -2390,24 +2390,44 @@ class ColumnCollectionMixin(object):
self._pending_colargs = [_to_schema_column_or_string(c)
for c in columns]
if _autoattach and self._pending_colargs:
columns = [
c for c in self._pending_colargs
if isinstance(c, Column) and
isinstance(c.table, Table)
]
self._check_attach()
tables = set([c.table for c in columns])
if len(tables) == 1:
self._set_parent_with_dispatch(tables.pop())
elif len(tables) > 1 and not self._allow_multiple_tables:
table = columns[0].table
others = [c for c in columns[1:] if c.table is not table]
if others:
raise exc.ArgumentError(
"Column(s) %s are not part of table '%s'." %
(", ".join("'%s'" % c for c in others),
table.description)
)
def _check_attach(self, evt=False):
col_objs = [
c for c in self._pending_colargs
if isinstance(c, Column)
]
cols_w_table = [
c for c in col_objs if isinstance(c.table, Table)
]
cols_wo_table = set(col_objs).difference(cols_w_table)
if cols_wo_table:
assert not evt, "Should not reach here on event call"
def _col_attached(column, table):
cols_wo_table.discard(column)
if not cols_wo_table:
self._check_attach(evt=True)
self._cols_wo_table = cols_wo_table
for col in cols_wo_table:
col._on_table_attach(_col_attached)
return
columns = cols_w_table
tables = set([c.table for c in columns])
if len(tables) == 1:
self._set_parent_with_dispatch(tables.pop())
elif len(tables) > 1 and not self._allow_multiple_tables:
table = columns[0].table
others = [c for c in columns[1:] if c.table is not table]
if others:
raise exc.ArgumentError(
"Column(s) %s are not part of table '%s'." %
(", ".join("'%s'" % c for c in others),
table.description)
)
def _set_parent(self, table):
for col in self._pending_colargs:
+35
View File
@@ -485,6 +485,41 @@ class DeclarativeInheritanceTest(DeclarativeTestBase):
).one(),
Engineer(name='vlad', primary_language='cobol'))
def test_single_constraint_on_sub(self):
"""test the somewhat unusual case of [ticket:3341]"""
class Person(Base, fixtures.ComparableEntity):
__tablename__ = 'people'
id = Column(Integer, primary_key=True,
test_needs_autoincrement=True)
name = Column(String(50))
discriminator = Column('type', String(50))
__mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
__mapper_args__ = {'polymorphic_identity': 'engineer'}
primary_language = Column(String(50))
__hack_args_one__ = sa.UniqueConstraint(
Person.name, primary_language)
__hack_args_two__ = sa.CheckConstraint(
Person.name != primary_language)
uq = [c for c in Person.__table__.constraints
if isinstance(c, sa.UniqueConstraint)][0]
ck = [c for c in Person.__table__.constraints
if isinstance(c, sa.CheckConstraint)][0]
eq_(
list(uq.columns),
[Person.__table__.c.name, Person.__table__.c.primary_language]
)
eq_(
list(ck.columns),
[Person.__table__.c.name, Person.__table__.c.primary_language]
)
@testing.skip_if(lambda: testing.against('oracle'),
"Test has an empty insert in it at the moment")
def test_columns_single_inheritance_conflict_resolution(self):
+97
View File
@@ -1052,6 +1052,103 @@ class ConstraintAPITest(fixtures.TestBase):
assert c not in t.constraints
assert c not in t2.constraints
def test_auto_append_ck_on_col_attach_one(self):
m = MetaData()
a = Column('a', Integer)
b = Column('b', Integer)
ck = CheckConstraint(a > b)
t = Table('tbl', m, a, b)
assert ck in t.constraints
def test_auto_append_ck_on_col_attach_two(self):
m = MetaData()
a = Column('a', Integer)
b = Column('b', Integer)
c = Column('c', Integer)
ck = CheckConstraint(a > b + c)
t = Table('tbl', m, a)
assert ck not in t.constraints
t.append_column(b)
assert ck not in t.constraints
t.append_column(c)
assert ck in t.constraints
def test_auto_append_ck_on_col_attach_three(self):
m = MetaData()
a = Column('a', Integer)
b = Column('b', Integer)
c = Column('c', Integer)
ck = CheckConstraint(a > b + c)
t = Table('tbl', m, a)
assert ck not in t.constraints
t.append_column(b)
assert ck not in t.constraints
t2 = Table('t2', m)
t2.append_column(c)
# two different tables, so CheckConstraint does nothing.
assert ck not in t.constraints
def test_auto_append_uq_on_col_attach_one(self):
m = MetaData()
a = Column('a', Integer)
b = Column('b', Integer)
uq = UniqueConstraint(a, b)
t = Table('tbl', m, a, b)
assert uq in t.constraints
def test_auto_append_uq_on_col_attach_two(self):
m = MetaData()
a = Column('a', Integer)
b = Column('b', Integer)
c = Column('c', Integer)
uq = UniqueConstraint(a, b, c)
t = Table('tbl', m, a)
assert uq not in t.constraints
t.append_column(b)
assert uq not in t.constraints
t.append_column(c)
assert uq in t.constraints
def test_auto_append_uq_on_col_attach_three(self):
m = MetaData()
a = Column('a', Integer)
b = Column('b', Integer)
c = Column('c', Integer)
uq = UniqueConstraint(a, b, c)
t = Table('tbl', m, a)
assert uq not in t.constraints
t.append_column(b)
assert uq not in t.constraints
t2 = Table('t2', m)
# two different tables, so UniqueConstraint raises
assert_raises_message(
exc.ArgumentError,
r"Column\(s\) 't2\.c' are not part of table 'tbl'\.",
t2.append_column, c
)
def test_index_asserts_cols_standalone(self):
metadata = MetaData()