- Fixed ORM bug where changing the primary key of an object, then marking

it for DELETE would fail to target the correct row for DELETE.
Then to compound matters, basic "number of rows matched" checks were
not being performed.  Both issues are fixed, however note that the
"rows matched" check requires so-called "sane multi-row count"
functionality; the DBAPI's executemany() method must count up the
rows matched by individual statements and SQLAlchemy's dialect must
mark this feature as supported, currently applies to some mysql dialects,
psycopg2, sqlite only. fixes #3006
- Enabled "sane multi-row count" checking for the psycopg2 DBAPI, as
this seems to be supported as of psycopg2 2.0.9.
This commit is contained in:
Mike Bayer
2014-03-28 16:32:11 -04:00
parent 9cdbed37f8
commit c01558ae7f
10 changed files with 183 additions and 37 deletions
+22
View File
@@ -11,6 +11,28 @@
.. changelog::
:version: 0.8.6
.. change::
:tags: bug, orm
:tickets: 3006
:versions: 0.9.4
Fixed ORM bug where changing the primary key of an object, then marking
it for DELETE would fail to target the correct row for DELETE.
Then to compound matters, basic "number of rows matched" checks were
not being performed. Both issues are fixed, however note that the
"rows matched" check requires so-called "sane multi-row count"
functionality; the DBAPI's executemany() method must count up the
rows matched by individual statements and SQLAlchemy's dialect must
mark this feature as supported, currently applies to some mysql dialects,
psycopg2, sqlite only.
.. change::
:tags: feature, postgresql
:versions: 0.9.4
Enabled "sane multi-row count" checking for the psycopg2 DBAPI, as
this seems to be supported as of psycopg2 2.0.9.
.. change::
:tags: bug, postgresql
:tickets: 3000
@@ -347,7 +347,7 @@ class PGDialect_psycopg2(PGDialect):
supports_unicode_statements = False
default_paramstyle = 'pyformat'
supports_sane_multi_rowcount = False
supports_sane_multi_rowcount = False # set to true based on psycopg2 version
execution_ctx_cls = PGExecutionContext_psycopg2
statement_compiler = PGCompiler_psycopg2
preparer = PGIdentifierPreparer_psycopg2
@@ -393,6 +393,9 @@ class PGDialect_psycopg2(PGDialect):
is not None
self._has_native_json = self.psycopg2_version >= (2, 5)
# http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9
self.supports_sane_multi_rowcount = self.psycopg2_version >= (2, 0, 9)
@classmethod
def dbapi(cls):
import psycopg2
+2
View File
@@ -514,6 +514,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
self.compiled = compiled
if not compiled.can_execute:
import pdb
pdb.set_trace()
raise exc.ArgumentError("Not an executable clause")
self.execution_options = compiled.statement._execution_options
+21 -2
View File
@@ -91,8 +91,27 @@ attribute, so that the ordering is correct when first loaded.
.. warning::
:class:`.OrderingList` only provides limited functionality when a primary
key column or unique column is the target of the sort. Since changing the
order of entries often means that two rows must trade values, this is not
key column or unique column is the target of the sort. The two operations
that are unsupported or are problematic are:
* two entries must trade values. This is not supported directly in the
case of a primary key or unique constraint because it means at least
one row would need to be temporarily removed first, or changed to
a third, neutral value while the switch occurs.
* an entry must be deleted in order to make room for a new entry. SQLAlchemy's
unit of work performs all INSERTs before DELETEs within a single flush
by default. A future feature will allow this to be configurable for
specific sets of columns on mappings.
Additional issues when using primary keys as ordering keys are that UPDATE
or DELETE statements on target rows may fail to fire correctly as orderinglist
may change their primary key value
Since changing the
order of entries often means that either rows must trade values,
or rows must be deleted to make way for new inserts, this is not
possible when the value is constrained by a primary key or unique
constraint, since one of the rows would temporarily have to point to a
third available value so that the other row could take its old
+15 -11
View File
@@ -450,7 +450,7 @@ def _collect_delete_commands(base_mapper, uowtransaction, table,
for col in mapper._pks_by_table[table]:
params[col.key] = \
value = \
mapper._get_state_attr_by_column(
mapper._get_committed_state_attr_by_column(
state, state_dict, col)
if value is None:
raise orm_exc.FlushError(
@@ -679,21 +679,19 @@ def _emit_delete_statements(base_mapper, uowtransaction, cached_connections,
connection = cached_connections[connection]
if need_version_id:
# TODO: need test coverage for this [ticket:1761]
expected = len(del_objects)
rows_matched = -1
if connection.dialect.supports_sane_multi_rowcount:
c = connection.execute(statement, del_objects)
rows_matched = c.rowcount
elif need_version_id:
if connection.dialect.supports_sane_rowcount:
rows = 0
rows_matched = 0
# execute deletes individually so that versioned
# rows can be verified
for params in del_objects:
c = connection.execute(statement, params)
rows += c.rowcount
if rows != len(del_objects):
raise orm_exc.StaleDataError(
"DELETE statement on table '%s' expected to "
"delete %d row(s); %d were matched." %
(table.description, len(del_objects), c.rowcount)
)
rows_matched += c.rowcount
else:
util.warn(
"Dialect %s does not support deleted rowcount "
@@ -704,6 +702,12 @@ def _emit_delete_statements(base_mapper, uowtransaction, cached_connections,
else:
connection.execute(statement, del_objects)
if rows_matched > -1 and expected != rows_matched:
raise orm_exc.StaleDataError(
"DELETE statement on table '%s' expected to "
"delete %d row(s); %d were matched." %
(table.description, expected, rows_matched)
)
def _finalize_insert_update_commands(base_mapper, uowtransaction,
states_to_insert, states_to_update):
+3
View File
@@ -74,6 +74,9 @@ class skip_if(object):
self._fails_on = skip_if(other, reason)
return self
def fails_on_everything_except(self, *dbs):
self._fails_on = skip_if(fails_on_everything_except(*dbs))
return self
class fails_if(skip_if):
def __call__(self, fn):
+6
View File
@@ -47,6 +47,12 @@ class SuiteRequirements(Requirements):
return exclusions.open()
@property
def non_updating_cascade(self):
"""target database must *not* support ON UPDATE..CASCADE behavior in
foreign keys."""
return exclusions.closed()
@property
def deferrable_fks(self):
return exclusions.closed()
+5 -11
View File
@@ -389,10 +389,7 @@ class NaturalPKTest(fixtures.MappedTest):
def test_manytomany_passive(self):
self._test_manytomany(True)
# mysqldb executemany() of the association table fails to
# report the correct row count
@testing.fails_if(lambda: testing.against('mysql')
and not (testing.against('+zxjdbc') or testing.against('+cymysql')))
@testing.requires.non_updating_cascade
def test_manytomany_nonpassive(self):
self._test_manytomany(False)
@@ -798,8 +795,7 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
def test_onetomany_passive(self):
self._test_onetomany(True)
# PG etc. need passive=True to allow PK->PK cascade
@testing.fails_on_everything_except('sqlite', 'oracle', '+zxjdbc')
@testing.requires.non_updating_cascade
def test_onetomany_nonpassive(self):
self._test_onetomany(False)
@@ -876,7 +872,7 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
def test_change_m2o_passive(self):
self._test_change_m2o(True)
@testing.fails_on_everything_except('sqlite', 'oracle', '+zxjdbc')
@testing.requires.non_updating_cascade
def test_change_m2o_nonpassive(self):
self._test_change_m2o(False)
@@ -1063,8 +1059,7 @@ class JoinedInheritanceTest(fixtures.MappedTest):
def test_pk_passive(self):
self._test_pk(True)
# PG etc. need passive=True to allow PK->PK cascade
@testing.fails_on_everything_except('sqlite', 'oracle', '+zxjdbc')
@testing.requires.non_updating_cascade
def test_pk_nonpassive(self):
self._test_pk(False)
@@ -1073,8 +1068,7 @@ class JoinedInheritanceTest(fixtures.MappedTest):
self._test_fk(True)
# PG etc. need passive=True to allow PK->PK cascade
@testing.fails_on_everything_except('sqlite', 'mysql+zxjdbc', 'oracle',
'postgresql+zxjdbc')
@testing.requires.non_updating_cascade
def test_fk_nonpassive(self):
self._test_fk(False)
+97 -11
View File
@@ -1203,20 +1203,20 @@ class RowswitchAccountingTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('parent', metadata,
Column('id', Integer, primary_key=True)
Column('id', Integer, primary_key=True),
Column('data', Integer)
)
Table('child', metadata,
Column('id', Integer, ForeignKey('parent.id'), primary_key=True)
Column('id', Integer, ForeignKey('parent.id'), primary_key=True),
Column('data', Integer)
)
def test_accounting_for_rowswitch(self):
def _fixture(self):
parent, child = self.tables.parent, self.tables.child
class Parent(object):
def __init__(self, id):
self.id = id
self.child = Child()
class Child(object):
class Parent(fixtures.BasicEntity):
pass
class Child(fixtures.BasicEntity):
pass
mapper(Parent, parent, properties={
@@ -1225,15 +1225,19 @@ class RowswitchAccountingTest(fixtures.MappedTest):
backref="parent")
})
mapper(Child, child)
return Parent, Child
def test_switch_on_update(self):
Parent, Child = self._fixture()
sess = create_session(autocommit=False)
p1 = Parent(1)
p1 = Parent(id=1, child=Child())
sess.add(p1)
sess.commit()
sess.close()
p2 = Parent(1)
p2 = Parent(id=1, child=Child())
p3 = sess.merge(p2)
old = attributes.get_history(p3, 'child')[2][0]
@@ -1244,7 +1248,7 @@ class RowswitchAccountingTest(fixtures.MappedTest):
assert p3.child._sa_instance_state.session_id == sess.hash_key
assert p3.child in sess
p4 = Parent(1)
p4 = Parent(id=1, child=Child())
p5 = sess.merge(p4)
old = attributes.get_history(p5, 'child')[2][0]
@@ -1252,6 +1256,88 @@ class RowswitchAccountingTest(fixtures.MappedTest):
sess.flush()
def test_switch_on_delete(self):
Parent, Child = self._fixture()
sess = Session()
p1 = Parent(id=1, data=2, child=None)
sess.add(p1)
sess.flush()
p1.id = 5
sess.delete(p1)
eq_(p1.id, 5)
sess.flush()
eq_(sess.scalar(self.tables.parent.count()), 0)
class BasicStaleChecksTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('parent', metadata,
Column('id', Integer, primary_key=True),
Column('data', Integer)
)
Table('child', metadata,
Column('id', Integer, ForeignKey('parent.id'), primary_key=True),
Column('data', Integer)
)
def _fixture(self):
parent, child = self.tables.parent, self.tables.child
class Parent(fixtures.BasicEntity):
pass
class Child(fixtures.BasicEntity):
pass
mapper(Parent, parent, properties={
'child':relationship(Child, uselist=False,
cascade="all, delete-orphan",
backref="parent")
})
mapper(Child, child)
return Parent, Child
def test_update_single_missing(self):
Parent, Child = self._fixture()
sess = Session()
p1 = Parent(id=1, data=2)
sess.add(p1)
sess.flush()
sess.execute(self.tables.parent.delete())
p1.data = 3
assert_raises_message(
orm_exc.StaleDataError,
"UPDATE statement on table 'parent' expected to "
"update 1 row\(s\); 0 were matched.",
sess.flush
)
@testing.requires.sane_multi_rowcount
def test_delete_multi_missing(self):
Parent, Child = self._fixture()
sess = Session()
p1 = Parent(id=1, data=2, child=None)
p2 = Parent(id=2, data=3, child=None)
sess.add_all([p1, p2])
sess.flush()
sess.execute(self.tables.parent.delete())
sess.delete(p1)
sess.delete(p2)
assert_raises_message(
orm_exc.StaleDataError,
"DELETE statement on table 'parent' expected to "
"delete 2 row\(s\); 0 were matched.",
sess.flush
)
class BatchInsertsTest(fixtures.MappedTest, testing.AssertsExecutionResults):
@classmethod
def define_tables(cls, metadata):
+8 -1
View File
@@ -63,6 +63,13 @@ class DefaultRequirements(SuiteRequirements):
'target backend does not support ON UPDATE CASCADE'
)
@property
def non_updating_cascade(self):
"""target database must *not* support ON UPDATE..CASCADE behavior in
foreign keys."""
return fails_on_everything_except('sqlite', 'oracle', '+zxjdbc') + skip_if('mssql')
@property
def deferrable_fks(self):
"""target database must support deferrable fks"""
@@ -438,7 +445,7 @@ class DefaultRequirements(SuiteRequirements):
@property
def sane_multi_rowcount(self):
return skip_if(
return fails_if(
lambda config: not config.db.dialect.supports_sane_multi_rowcount,
"driver doesn't support 'sane' multi row count"
)