Merge "Send deterministic ordering into unit of work topological"

This commit is contained in:
mike bayer
2020-12-12 02:53:44 +00:00
committed by Gerrit Code Review
9 changed files with 80 additions and 54 deletions
+12
View File
@@ -0,0 +1,12 @@
.. change::
:tags: bug, orm, unitofwork
:tickets: 5735
Improved the unit of work topological sorting system such that the
toplogical sort is now deterministic based on the sorting of the input set,
which itself is now sorted at the level of mappers, so that the same inputs
of affected mappers should produce the same output every time, among
mappers / tables that don't have any dependency on each other. This further
reduces the chance of deadlocks as can be observed in a flush that UPDATEs
among multiple, unrelated tables such that row locks are generated.
+1
View File
@@ -43,6 +43,7 @@ class DependencyProcessor(object):
else:
self._passive_update_flag = attributes.PASSIVE_OFF
self.sort_key = "%s_%s" % (self.parent._sort_key, prop.key)
self.key = prop.key
if not self.prop.synchronize_pairs:
raise sa_exc.ArgumentError(
+4
View File
@@ -574,6 +574,10 @@ class Mapper(
"""
self.class_ = util.assert_arg_type(class_, type, "class_")
self._sort_key = "%s.%s" % (
self.class_.__module__,
self.class_.__name__,
)
self.class_manager = None
+24 -8
View File
@@ -422,6 +422,10 @@ class UOWTransaction(object):
def execute(self):
postsort_actions = self._generate_actions()
postsort_actions = sorted(
postsort_actions,
key=lambda item: item.sort_key,
)
# sort = topological.sort(self.dependencies, postsort_actions)
# print "--------------"
# print "\ndependencies:", self.dependencies
@@ -431,9 +435,10 @@ class UOWTransaction(object):
# execute
if self.cycles:
for set_ in topological.sort_as_subsets(
for subset in topological.sort_as_subsets(
self.dependencies, postsort_actions
):
set_ = set(subset)
while set_:
n = set_.pop()
n.execute_aggregate(self, set_)
@@ -542,10 +547,15 @@ class PostSortRec(object):
class ProcessAll(IterateMappersMixin, PostSortRec):
__slots__ = "dependency_processor", "isdelete", "fromparent"
__slots__ = "dependency_processor", "isdelete", "fromparent", "sort_key"
def __init__(self, uow, dependency_processor, isdelete, fromparent):
self.dependency_processor = dependency_processor
self.sort_key = (
"ProcessAll",
self.dependency_processor.sort_key,
isdelete,
)
self.isdelete = isdelete
self.fromparent = fromparent
uow.deps[dependency_processor.parent.base_mapper].add(
@@ -582,11 +592,12 @@ class ProcessAll(IterateMappersMixin, PostSortRec):
class PostUpdateAll(PostSortRec):
__slots__ = "mapper", "isdelete"
__slots__ = "mapper", "isdelete", "sort_key"
def __init__(self, uow, mapper, isdelete):
self.mapper = mapper
self.isdelete = isdelete
self.sort_key = ("PostUpdateAll", mapper._sort_key, isdelete)
@util.preload_module("sqlalchemy.orm.persistence")
def execute(self, uow):
@@ -598,10 +609,11 @@ class PostUpdateAll(PostSortRec):
class SaveUpdateAll(PostSortRec):
__slots__ = ("mapper",)
__slots__ = ("mapper", "sort_key")
def __init__(self, uow, mapper):
self.mapper = mapper
self.sort_key = ("SaveUpdateAll", mapper._sort_key)
assert mapper is mapper.base_mapper
@util.preload_module("sqlalchemy.orm.persistence")
@@ -634,10 +646,11 @@ class SaveUpdateAll(PostSortRec):
class DeleteAll(PostSortRec):
__slots__ = ("mapper",)
__slots__ = ("mapper", "sort_key")
def __init__(self, uow, mapper):
self.mapper = mapper
self.sort_key = ("DeleteAll", mapper._sort_key)
assert mapper is mapper.base_mapper
@util.preload_module("sqlalchemy.orm.persistence")
@@ -670,10 +683,11 @@ class DeleteAll(PostSortRec):
class ProcessState(PostSortRec):
__slots__ = "dependency_processor", "isdelete", "state"
__slots__ = "dependency_processor", "isdelete", "state", "sort_key"
def __init__(self, uow, dependency_processor, isdelete, state):
self.dependency_processor = dependency_processor
self.sort_key = ("ProcessState", dependency_processor.sort_key)
self.isdelete = isdelete
self.state = state
@@ -705,11 +719,12 @@ class ProcessState(PostSortRec):
class SaveUpdateState(PostSortRec):
__slots__ = "state", "mapper"
__slots__ = "state", "mapper", "sort_key"
def __init__(self, uow, state):
self.state = state
self.mapper = state.mapper.base_mapper
self.sort_key = ("ProcessState", self.mapper._sort_key)
@util.preload_module("sqlalchemy.orm.persistence")
def execute_aggregate(self, uow, recs):
@@ -732,11 +747,12 @@ class SaveUpdateState(PostSortRec):
class DeleteState(PostSortRec):
__slots__ = "state", "mapper"
__slots__ = "state", "mapper", "sort_key"
def __init__(self, uow, state):
self.state = state
self.mapper = state.mapper.base_mapper
self.sort_key = ("DeleteState", self.mapper._sort_key)
@util.preload_module("sqlalchemy.orm.persistence")
def execute_aggregate(self, uow, recs):
-2
View File
@@ -1171,7 +1171,6 @@ def sort_tables_and_constraints(
topological.sort(
fixed_dependencies.union(mutable_dependencies),
tables,
deterministic_order=True,
)
)
except exc.CircularDependencyError as err:
@@ -1203,7 +1202,6 @@ def sort_tables_and_constraints(
topological.sort(
fixed_dependencies.union(mutable_dependencies),
tables,
deterministic_order=True,
)
)
+16 -13
View File
@@ -10,25 +10,23 @@
from .. import util
from ..exc import CircularDependencyError
__all__ = ["sort", "sort_as_subsets", "find_cycles"]
def sort_as_subsets(tuples, allitems, deterministic_order=False):
def sort_as_subsets(tuples, allitems):
edges = util.defaultdict(set)
for parent, child in tuples:
edges[child].add(parent)
Set = util.OrderedSet if deterministic_order else set
todo = list(allitems)
todo_set = set(allitems)
todo = Set(allitems)
while todo:
output = Set()
while todo_set:
output = []
for node in todo:
if todo.isdisjoint(edges[node]):
output.add(node)
if todo_set.isdisjoint(edges[node]):
output.append(node)
if not output:
raise CircularDependencyError(
@@ -37,18 +35,23 @@ def sort_as_subsets(tuples, allitems, deterministic_order=False):
_gen_edges(edges),
)
todo.difference_update(output)
todo_set.difference_update(output)
todo = [t for t in todo if t in todo_set]
yield output
def sort(tuples, allitems, deterministic_order=False):
def sort(tuples, allitems, deterministic_order=True):
"""sort the given list of items by dependency.
'tuples' is a list of tuples representing a partial ordering.
'deterministic_order' keeps items within a dependency tier in list order.
deterministic_order is no longer used, the order is now always
deterministic given the order of "allitems". the flag is there
for backwards compatibility with Alembic.
"""
for set_ in sort_as_subsets(tuples, allitems, deterministic_order):
for set_ in sort_as_subsets(tuples, allitems):
for s in set_:
yield s
+1 -3
View File
@@ -16,9 +16,7 @@ class DependencySortTest(fixtures.TestBase):
assert conforms_partial_ordering(tuples, result)
def assert_sort_deterministic(self, tuples, allitems, expected):
result = list(
topological.sort(tuples, allitems, deterministic_order=True)
)
result = list(topological.sort(tuples, allitems))
assert conforms_partial_ordering(tuples, result)
assert result == expected
+8 -10
View File
@@ -2531,16 +2531,14 @@ class ManyToManyTest(_fixtures.FixtureTest):
self.assert_sql_execution(
testing.db,
session.flush,
AllOf(
CompiledSQL(
"UPDATE items SET description=:description "
"WHERE items.id = :items_id",
{"description": "item4updated", "items_id": objects[4].id},
),
CompiledSQL(
"INSERT INTO keywords (name) " "VALUES (:name)",
{"name": "yellow"},
),
CompiledSQL(
"UPDATE items SET description=:description "
"WHERE items.id = :items_id",
{"description": "item4updated", "items_id": objects[4].id},
),
CompiledSQL(
"INSERT INTO keywords (name) " "VALUES (:name)",
{"name": "yellow"},
),
CompiledSQL(
"INSERT INTO item_keywords (item_id, keyword_id) "
+14 -18
View File
@@ -681,15 +681,13 @@ class RudimentaryFlushTest(UOWTest):
self.assert_sql_execution(
testing.db,
sess.flush,
AllOf(
CompiledSQL(
"INSERT INTO keywords (name) VALUES (:name)",
{"name": "k1"},
),
CompiledSQL(
"INSERT INTO items (description) VALUES (:description)",
{"description": "i1"},
),
CompiledSQL(
"INSERT INTO items (description) VALUES (:description)",
{"description": "i1"},
),
CompiledSQL(
"INSERT INTO keywords (name) VALUES (:name)",
{"name": "k1"},
),
CompiledSQL(
"INSERT INTO item_keywords (item_id, keyword_id) "
@@ -874,15 +872,13 @@ class SingleCycleTest(UOWTest):
self.assert_sql_execution(
testing.db,
sess.flush,
AllOf(
CompiledSQL(
"UPDATE nodes SET parent_id=:parent_id "
"WHERE nodes.id = :nodes_id",
lambda ctx: [
{"nodes_id": n3.id, "parent_id": None},
{"nodes_id": n2.id, "parent_id": None},
],
)
CompiledSQL(
"UPDATE nodes SET parent_id=:parent_id "
"WHERE nodes.id = :nodes_id",
lambda ctx: [
{"nodes_id": n3.id, "parent_id": None},
{"nodes_id": n2.id, "parent_id": None},
],
),
CompiledSQL(
"DELETE FROM nodes WHERE nodes.id = :id",