- starting to groom the branch for its inclusion

- one-to-many relationships now maintain a list of positive
parent-child associations within the flush, preventing
previous parents marked as deleted from cascading a
delete or NULL foreign key set on those child objects,
despite the end-user not removing the child from the old
association. [ticket:1764]
- re-established Preprocess as unique on their arguments,
as they were definitely duped in inheritance scenarios
- added a "memo" feature to UOWTransaction which represents the usual
pattern of using the .attributes collection
- added the test case from [ticket:1081] into perf/
This commit is contained in:
Mike Bayer
2010-04-10 19:21:54 -04:00
parent a6c0057b74
commit 44c67fef8c
8 changed files with 330 additions and 53 deletions
+23
View File
@@ -4,6 +4,29 @@
CHANGES
=======
0.6uow_refactor
===============
- orm
- Unit of work internals have been rewritten. Units of work
with large numbers of objects interdependent objects
can now be flushed without recursion overflows
as there is no longer reliance upon recursive calls
[ticket:1081]. The number of internal structures now stays
constant for a particular session state, regardless of
how many relationships are present on mappings. The flow
of events now corresponds to a linear list of steps,
generated by the mappers and relationships based on actual
work to be done, filtered through a single topological sort
for correct ordering. Flush actions are assembled using
far fewer steps and less memory. [ticket:1742]
- one-to-many relationships now maintain a list of positive
parent-child associations within the flush, preventing
previous parents marked as deleted from cascading a
delete or NULL foreign key set on those child objects,
despite the end-user not removing the child from the old
association. [ticket:1764]
0.6.0
=====
+1 -1
View File
@@ -114,6 +114,6 @@ from sqlalchemy.engine import create_engine, engine_from_config
__all__ = sorted(name for name, obj in locals().items()
if not (name.startswith('_') or inspect.ismodule(obj)))
__version__ = '0.6beta3'
__version__ = '0.6uow_refactor'
del inspect, sys
+29 -24
View File
@@ -10,7 +10,7 @@
from sqlalchemy import sql, util
import sqlalchemy.exceptions as sa_exc
from sqlalchemy.orm import attributes, exc, sync, unitofwork
from sqlalchemy.orm import attributes, exc, sync, unitofwork, util as mapperutil
from sqlalchemy.orm.interfaces import ONETOMANY, MANYTOONE, MANYTOMANY
@@ -42,21 +42,13 @@ class DependencyProcessor(object):
"child are present" %
self.prop)
def _get_instrumented_attribute(self):
"""Return the ``InstrumentedAttribute`` handled by this
``DependencyProecssor``.
"""
return self.parent.class_manager.get_impl(self.key)
def hasparent(self, state):
"""return True if the given object instance has a parent,
according to the ``InstrumentedAttribute`` handled by this
``DependencyProcessor``.
"""
# TODO: use correct API for this
return self._get_instrumented_attribute().hasparent(state)
return self.parent.class_manager.get_impl(self.key).hasparent(state)
def per_property_preprocessors(self, uow):
"""establish actions and dependencies related to a flush.
@@ -326,7 +318,7 @@ class OneToManyDP(DependencyProcessor):
(parent_saves, after_save),
(after_save, child_saves),
(after_save, child_deletes),
(child_saves, parent_deletes),
(child_deletes, parent_deletes),
@@ -397,6 +389,7 @@ class OneToManyDP(DependencyProcessor):
uowcommit.register_object(child, isdelete=True)
else:
uowcommit.register_object(child)
if should_null_fks:
for child in history.unchanged:
if child is not None:
@@ -404,15 +397,23 @@ class OneToManyDP(DependencyProcessor):
def presort_saves(self, uowcommit, states):
children_added = uowcommit.memo(('children_added', self), set)
for state in states:
pks_changed = self._pks_changed(uowcommit, state)
history = uowcommit.get_attribute_history(
state,
self.key,
passive=True)
passive=not pks_changed
or self.passive_updates)
if history:
for child in history.added:
if child is not None:
uowcommit.register_object(child)
uowcommit.register_object(child, cancel_delete=True)
children_added.update(history.added)
for child in history.deleted:
if not self.cascade.delete_orphan:
uowcommit.register_object(child, isdelete=False)
@@ -422,11 +423,8 @@ class OneToManyDP(DependencyProcessor):
uowcommit.register_object(
attributes.instance_state(c),
isdelete=True)
if self._pks_changed(uowcommit, state):
if not history:
history = uowcommit.get_attribute_history(
state, self.key,
passive=self.passive_updates)
if pks_changed:
if history:
for child in history.unchanged:
if child is not None:
@@ -442,6 +440,8 @@ class OneToManyDP(DependencyProcessor):
# safely for any cascade but is unnecessary if delete cascade
# is on.
if self.post_update or not self.passive_deletes == 'all':
children_added = uowcommit.memo(('children_added', self), set)
for state in states:
history = uowcommit.get_attribute_history(
state,
@@ -461,7 +461,8 @@ class OneToManyDP(DependencyProcessor):
uowcommit,
[state])
if self.post_update or not self.cascade.delete:
for child in history.unchanged:
for child in set(history.unchanged).\
difference(children_added):
if child is not None:
self._synchronize(
state,
@@ -472,7 +473,11 @@ class OneToManyDP(DependencyProcessor):
child,
uowcommit,
[state])
# technically, we can even remove each child from the
# collection here too. but this would be a somewhat
# inconsistent behavior since it wouldn't happen if the old
# parent wasn't deleted but child was moved.
def process_saves(self, uowcommit, states):
for state in states:
history = uowcommit.get_attribute_history(state, self.key, passive=True)
@@ -731,10 +736,10 @@ class DetectKeySwitch(DependencyProcessor):
self._process_key_switches(states, uowcommit)
def _key_switchers(self, uow, states):
if ('pk_switchers', self) in uow.attributes:
switched, notswitched = uow.attributes[('pk_switchers', self)]
else:
uow.attributes[('pk_switchers', self)] = (switched, notswitched) = (set(), set())
switched, notswitched = uow.memo(
('pk_switchers', self),
lambda: (set(), set())
)
allstates = switched.union(notswitched)
for s in states:
+1 -2
View File
@@ -75,8 +75,7 @@ def source_modified(uowcommit, source, source_mapper, synchronize_pairs):
except exc.UnmappedColumnError:
_raise_col_to_prop(False, source_mapper, l, None, r)
history = uowcommit.get_attribute_history(source, prop.key, passive=True)
if len(history.deleted):
return True
return bool(history.deleted)
else:
return False
+38 -18
View File
@@ -12,13 +12,12 @@ organizes them in order of dependency, and executes.
"""
from sqlalchemy import util, log, topological
from sqlalchemy import util, topological
from sqlalchemy.orm import attributes, interfaces
from sqlalchemy.orm import util as mapperutil
from sqlalchemy.orm.util import _state_mapper
# Load lazily
object_session = None
_state_session = None
class UOWEventHandler(interfaces.AttributeExtension):
@@ -90,10 +89,10 @@ class UOWTransaction(object):
# as a parent.
self.mappers = util.defaultdict(set)
# a set of Preprocess objects, which gather
# a dictionary of Preprocess objects, which gather
# additional states impacted by the flush
# and determine if a flush action is needed
self.presort_actions = set()
self.presort_actions = {}
# dictionary of PostSortRec objects, each
# one issues work during the flush within
@@ -121,6 +120,13 @@ class UOWTransaction(object):
return state in self.states and self.states[state][0]
def memo(self, key, callable_):
if key in self.attributes:
return self.attributes[key]
else:
self.attributes[key] = ret = callable_()
return ret
def remove_state_actions(self, state):
"""remove pending actions for a state from the uowtransaction."""
@@ -139,10 +145,10 @@ class UOWTransaction(object):
# if the cached lookup was "passive" and now we want non-passive, do a non-passive
# lookup and re-cache
if cached_passive and not passive:
history = attributes.get_state_history(state, key, passive=False)
history = state.get_history(key, passive=False)
self.attributes[hashkey] = (history, passive)
else:
history = attributes.get_state_history(state, key, passive=passive)
history = state.get_history(key, passive=passive)
self.attributes[hashkey] = (history, passive)
if not history or not state.get_impl(key).uses_objects:
@@ -151,9 +157,12 @@ class UOWTransaction(object):
return history.as_state()
def register_preprocessor(self, processor, fromparent):
self.presort_actions.add(Preprocess(processor, fromparent))
key = (processor, fromparent)
if key not in self.presort_actions:
self.presort_actions[key] = Preprocess(processor, fromparent)
def register_object(self, state, isdelete=False, listonly=False):
def register_object(self, state, isdelete=False,
listonly=False, cancel_delete=False):
if not self.session._contains_state(state):
return
@@ -166,11 +175,8 @@ class UOWTransaction(object):
self.mappers[mapper].add(state)
self.states[state] = (isdelete, listonly)
else:
existing_isdelete, existing_listonly = self.states[state]
self.states[state] = (
existing_isdelete or isdelete,
existing_listonly and listonly
)
if not listonly and (isdelete or cancel_delete):
self.states[state] = (isdelete, False)
def issue_post_update(self, state, post_update_cols):
mapper = state.manager.mapper.base_mapper
@@ -180,11 +186,23 @@ class UOWTransaction(object):
@util.memoized_property
def _mapper_for_dep(self):
"""return a dynamic mapping of (Mapper, DependencyProcessor) to
True or False, indicating if the DependencyProcessor operates
on objects of that Mapper.
The result is stored in the dictionary persistently once
calculated.
"""
return util.PopulateDict(
lambda tup:tup[0]._props.get(tup[1].key) is tup[1].prop
)
def filter_states_for_dep(self, dep, states):
"""Filter the given list of InstanceStates to those relevant to the
given DependencyProcessor.
"""
mapper_for_dep = self._mapper_for_dep
return [s for s in states if mapper_for_dep[(s.manager.mapper, dep)]]
@@ -196,12 +214,16 @@ class UOWTransaction(object):
yield state
def _generate_actions(self):
"""Generate the full, unsorted collection of PostSortRecs as
well as dependency pairs for this UOWTransaction.
"""
# execute presort_actions, until all states
# have been processed. a presort_action might
# add new states to the uow.
while True:
ret = False
for action in list(self.presort_actions):
for action in list(self.presort_actions.values()):
if action.execute(self):
ret = True
if not ret:
@@ -211,7 +233,7 @@ class UOWTransaction(object):
self.cycles = cycles = topological.find_cycles(
self.dependencies,
self.postsort_actions.values())
if cycles:
# if yes, break the per-mapper actions into
# per-state actions
@@ -248,7 +270,7 @@ class UOWTransaction(object):
#sort = topological.sort(self.dependencies, postsort_actions)
#print "--------------"
#print self.dependencies
#print postsort_actions
#print list(sort)
#print "COUNT OF POSTSORT ACTIONS", len(postsort_actions)
# execute
@@ -281,8 +303,6 @@ class UOWTransaction(object):
# debug... would like to see how many do this
self.session._register_newly_persistent(state)
log.class_logger(UOWTransaction)
class IterateMappersMixin(object):
def _mappers(self, uow):
if self.fromparent:
+145 -1
View File
@@ -1156,7 +1156,6 @@ class UnsavedOrphansTest3(_base.MappedTest):
assert c not in s, "Should expunge customer when both parents are gone"
class DoubleParentOrphanTest(_base.MappedTest):
"""test orphan detection for an entity with two parent relationships"""
@@ -1276,6 +1275,151 @@ class CollectionAssignmentOrphanTest(_base.MappedTest):
eq_(sess.query(A).get(a1.id),
A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')]))
class O2MConflictTest(_base.MappedTest):
"""test that O2M dependency detects a change in parent, does the
right thing, and even updates the collection/attribute.
"""
@classmethod
def define_tables(cls, metadata):
Table("parent", metadata,
Column("id", Integer, primary_key=True, test_needs_autoincrement=True)
)
Table("child", metadata,
Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column('parent_id', Integer, ForeignKey('parent.id'), nullable=False)
)
@classmethod
def setup_classes(cls):
class Parent(_base.ComparableEntity):
pass
class Child(_base.ComparableEntity):
pass
@testing.resolve_artifact_names
def _do_delete_old_test(self):
sess = create_session()
p1, p2, c1 = Parent(), Parent(), Child()
if Parent.child.property.uselist:
p1.child.append(c1)
else:
p1.child = c1
sess.add_all([p1, c1])
sess.flush()
sess.delete(p1)
if Parent.child.property.uselist:
p2.child.append(c1)
else:
p2.child = c1
sess.add(p2)
sess.flush()
eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1])
@testing.resolve_artifact_names
def _do_move_test(self):
sess = create_session()
p1, p2, c1 = Parent(), Parent(), Child()
if Parent.child.property.uselist:
p1.child.append(c1)
else:
p1.child = c1
sess.add_all([p1, c1])
sess.flush()
if Parent.child.property.uselist:
p2.child.append(c1)
else:
p2.child = c1
sess.add(p2)
sess.flush()
eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1])
@testing.resolve_artifact_names
def test_o2o_delete_old(self):
mapper(Parent, parent, properties={
'child':relationship(Child, uselist=False)
})
mapper(Child, child)
self._do_delete_old_test()
self._do_move_test()
@testing.resolve_artifact_names
def test_o2m_delete_old(self):
mapper(Parent, parent, properties={
'child':relationship(Child, uselist=True)
})
mapper(Child, child)
self._do_delete_old_test()
self._do_move_test()
@testing.resolve_artifact_names
def test_o2o_backref_delete_old(self):
mapper(Parent, parent, properties={
'child':relationship(Child, uselist=False, backref='parent')
})
mapper(Child, child)
self._do_delete_old_test()
self._do_move_test()
@testing.resolve_artifact_names
def test_o2o_delcascade_delete_old(self):
mapper(Parent, parent, properties={
'child':relationship(Child, uselist=False, cascade="all, delete")
})
mapper(Child, child)
self._do_delete_old_test()
self._do_move_test()
@testing.resolve_artifact_names
def test_o2o_delorphan_delete_old(self):
mapper(Parent, parent, properties={
'child':relationship(Child, uselist=False, cascade="all, delete, delete-orphan")
})
mapper(Child, child)
self._do_delete_old_test()
self._do_move_test()
@testing.resolve_artifact_names
def test_o2o_delorphan_backref_delete_old(self):
mapper(Parent, parent, properties={
'child':relationship(Child, uselist=False,
cascade="all, delete, delete-orphan",
backref='parent')
})
mapper(Child, child)
self._do_delete_old_test()
self._do_move_test()
@testing.resolve_artifact_names
def test_o2o_backref_delorphan_delete_old(self):
mapper(Parent, parent)
mapper(Child, child, properties = {
'parent' : relationship(Parent, uselist=False, single_parent=True,
backref=backref('child', uselist=False),
cascade="all,delete,delete-orphan")
})
self._do_delete_old_test()
self._do_move_test()
@testing.resolve_artifact_names
def test_o2m_backref_delorphan_delete_old(self):
mapper(Parent, parent)
mapper(Child, child, properties = {
'parent' : relationship(Parent, uselist=False, single_parent=True,
backref=backref('child', uselist=True),
cascade="all,delete,delete-orphan")
})
self._do_delete_old_test()
self._do_move_test()
class PartialFlushTest(_base.MappedTest):
"""test cascade behavior as it relates to object lists passed to flush().
+9 -7
View File
@@ -14,10 +14,7 @@ from test.orm._fixtures import keywords, addresses, Base, Keyword, \
composite_pk_table, CompositePk
class AssertsUOW(object):
def _assert_uow_size(self,
session,
expected
):
def _get_test_uow(self, session):
uow = unitofwork.UOWTransaction(session)
deleted = set(session._deleted)
new = set(session._new)
@@ -26,6 +23,13 @@ class AssertsUOW(object):
uow.register_object(s)
for d in deleted:
uow.register_object(d, isdelete=True)
return uow
def _assert_uow_size(self,
session,
expected
):
uow = self._get_test_uow(session)
postsort_actions = uow._generate_actions()
print postsort_actions
eq_(len(postsort_actions), expected, postsort_actions)
@@ -33,8 +37,6 @@ class AssertsUOW(object):
class UOWTest(_fixtures.FixtureTest, testing.AssertsExecutionResults, AssertsUOW):
run_inserts = None
class RudimentaryFlushTest(UOWTest):
def test_one_to_many_save(self):
@@ -215,7 +217,7 @@ class RudimentaryFlushTest(UOWTest):
{'id':u1.id}
),
)
def test_m2o_flush_size(self):
mapper(User, users)
mapper(Address, addresses, properties={
+84
View File
@@ -0,0 +1,84 @@
import sqlalchemy as sa
from sqlalchemy import create_engine, MetaData, orm
from sqlalchemy import Column, ForeignKey
from sqlalchemy import Integer, String
from sqlalchemy.orm import mapper
from sqlalchemy.test import profiling
class Object(object):
pass
class Q(Object):
pass
class A(Object):
pass
class C(Object):
pass
class WC(C):
pass
engine = create_engine('sqlite:///:memory:', echo=True)
sm = orm.sessionmaker(bind=engine)
SA_Session = orm.scoped_session(sm)
SA_Metadata = MetaData()
object_table = sa.Table('Object',
SA_Metadata,
Column('ObjectID', Integer,primary_key=True),
Column('Type', String(1), nullable=False))
q_table = sa.Table('Q',
SA_Metadata,
Column('QID', Integer, ForeignKey('Object.ObjectID'),primary_key=True))
c_table = sa.Table('C',
SA_Metadata,
Column('CID', Integer, ForeignKey('Object.ObjectID'),primary_key=True))
wc_table = sa.Table('WC',
SA_Metadata,
Column('WCID', Integer, ForeignKey('C.CID'), primary_key=True))
a_table = sa.Table('A',
SA_Metadata,
Column('AID', Integer, ForeignKey('Object.ObjectID'),primary_key=True),
Column('QID', Integer, ForeignKey('Q.QID')),
Column('CID', Integer, ForeignKey('C.CID')))
mapper(Object, object_table, polymorphic_on=object_table.c.Type, polymorphic_identity='O')
mapper(Q, q_table, inherits=Object, polymorphic_identity='Q')
mapper(C, c_table, inherits=Object, polymorphic_identity='C')
mapper(WC, wc_table, inherits=C, polymorphic_identity='W')
mapper(A, a_table, inherits=Object, polymorphic_identity='A',
properties = {
'Q' : orm.relation(Q,primaryjoin=a_table.c.QID==q_table.c.QID,
backref='As'
),
'C' : orm.relation(C,primaryjoin=a_table.c.CID==c_table.c.CID,
backref='A',
uselist=False)
}
)
SA_Metadata.create_all(engine)
@profiling.profiled('large_flush', always=True, sort=['file'])
def generate_error():
q = Q()
for j in range(100): #at 306 the error does not pop out (depending on recursion depth)
a = A()
a.Q = q
a.C = WC()
SA_Session.add(q)
SA_Session.commit() #here the error pops out
generate_error()