mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-06 17:01:07 -04:00
2907dededf
Closes: #10889
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/10889
Pull-request-sha: 2ddeeb1906
Change-Id: I9a0d6e2776d8b6756af4a3c54668bdcd1a1f40f8
501 lines
14 KiB
Python
501 lines
14 KiB
Python
from sqlalchemy import ForeignKey
|
|
from sqlalchemy import Integer
|
|
from sqlalchemy import MetaData
|
|
from sqlalchemy import String
|
|
from sqlalchemy import testing
|
|
from sqlalchemy.ext.orderinglist import ordering_list
|
|
from sqlalchemy.orm import clear_mappers
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy.testing import eq_
|
|
from sqlalchemy.testing import fixtures
|
|
from sqlalchemy.testing.fixtures import fixture_session
|
|
from sqlalchemy.testing.schema import Column
|
|
from sqlalchemy.testing.schema import Table
|
|
from sqlalchemy.testing.util import picklers
|
|
|
|
|
|
metadata = None
|
|
|
|
|
|
def step_numbering(step):
|
|
"""order in whole steps"""
|
|
|
|
def f(index, collection):
|
|
return step * index
|
|
|
|
return f
|
|
|
|
|
|
def fibonacci_numbering(order_col):
|
|
"""
|
|
almost fibonacci- skip the first 2 steps
|
|
e.g. 1, 2, 3, 5, 8, ... instead of 0, 1, 1, 2, 3, ...
|
|
otherwise ordering of the elements at '1' is undefined... ;)
|
|
"""
|
|
|
|
def f(index, collection):
|
|
if index == 0:
|
|
return 1
|
|
elif index == 1:
|
|
return 2
|
|
else:
|
|
return getattr(collection[index - 1], order_col) + getattr(
|
|
collection[index - 2], order_col
|
|
)
|
|
|
|
return f
|
|
|
|
|
|
def alpha_ordering(index, collection):
|
|
"""
|
|
0 -> A, 1 -> B, ... 25 -> Z, 26 -> AA, 27 -> AB, ...
|
|
"""
|
|
s = ""
|
|
while index > 25:
|
|
d = index / 26
|
|
s += chr((d % 26) + 64)
|
|
index -= d * 26
|
|
s += chr(index + 65)
|
|
return s
|
|
|
|
|
|
class OrderingListTest(fixtures.MappedTest):
|
|
def setup_test(self):
|
|
global metadata, slides_table, bullets_table, Slide, Bullet
|
|
slides_table, bullets_table = None, None
|
|
Slide, Bullet = None, None
|
|
metadata = MetaData()
|
|
|
|
def _setup(self, test_collection_class):
|
|
"""Build a relationship situation using the given
|
|
test_collection_class factory"""
|
|
|
|
global slides_table, bullets_table, Slide, Bullet
|
|
|
|
slides_table = Table(
|
|
"test_Slides",
|
|
metadata,
|
|
Column(
|
|
"id", Integer, primary_key=True, test_needs_autoincrement=True
|
|
),
|
|
Column("name", String(128)),
|
|
)
|
|
bullets_table = Table(
|
|
"test_Bullets",
|
|
metadata,
|
|
Column(
|
|
"id", Integer, primary_key=True, test_needs_autoincrement=True
|
|
),
|
|
Column("slide_id", Integer, ForeignKey("test_Slides.id")),
|
|
Column("position", Integer),
|
|
Column("text", String(128)),
|
|
)
|
|
|
|
class Slide:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
def __repr__(self):
|
|
return '<Slide "%s">' % self.name
|
|
|
|
class Bullet:
|
|
def __init__(self, text):
|
|
self.text = text
|
|
|
|
def __repr__(self):
|
|
return '<Bullet "%s" pos %s>' % (self.text, self.position)
|
|
|
|
clear_mappers()
|
|
self.mapper_registry.map_imperatively(
|
|
Slide,
|
|
slides_table,
|
|
properties={
|
|
"bullets": relationship(
|
|
Bullet,
|
|
lazy="joined",
|
|
collection_class=test_collection_class,
|
|
backref="slide",
|
|
order_by=[bullets_table.c.position],
|
|
)
|
|
},
|
|
)
|
|
self.mapper_registry.map_imperatively(Bullet, bullets_table)
|
|
|
|
metadata.create_all(testing.db)
|
|
|
|
def teardown_test(self):
|
|
metadata.drop_all(testing.db)
|
|
|
|
def test_append_no_reorder(self):
|
|
self._setup(
|
|
ordering_list("position", count_from=1, reorder_on_append=False)
|
|
)
|
|
|
|
s1 = Slide("Slide #1")
|
|
|
|
self.assert_(not s1.bullets)
|
|
self.assert_(len(s1.bullets) == 0)
|
|
|
|
s1.bullets.append(Bullet("s1/b1"))
|
|
|
|
self.assert_(s1.bullets)
|
|
self.assert_(len(s1.bullets) == 1)
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
|
|
s1.bullets.append(Bullet("s1/b2"))
|
|
|
|
self.assert_(len(s1.bullets) == 2)
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
self.assert_(s1.bullets[1].position == 2)
|
|
|
|
bul = Bullet("s1/b100")
|
|
bul.position = 100
|
|
s1.bullets.append(bul)
|
|
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
self.assert_(s1.bullets[1].position == 2)
|
|
self.assert_(s1.bullets[2].position == 100)
|
|
|
|
s1.bullets.append(Bullet("s1/b4"))
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
self.assert_(s1.bullets[1].position == 2)
|
|
self.assert_(s1.bullets[2].position == 100)
|
|
self.assert_(s1.bullets[3].position == 4)
|
|
|
|
s1.bullets._reorder()
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
self.assert_(s1.bullets[1].position == 2)
|
|
self.assert_(s1.bullets[2].position == 3)
|
|
self.assert_(s1.bullets[3].position == 4)
|
|
|
|
session = fixture_session()
|
|
session.add(s1)
|
|
session.flush()
|
|
|
|
id_ = s1.id
|
|
session.expunge_all()
|
|
del s1
|
|
|
|
srt = session.get(Slide, id_)
|
|
|
|
self.assert_(srt.bullets)
|
|
self.assert_(len(srt.bullets) == 4)
|
|
|
|
titles = ["s1/b1", "s1/b2", "s1/b100", "s1/b4"]
|
|
found = [b.text for b in srt.bullets]
|
|
|
|
self.assert_(titles == found)
|
|
|
|
def test_append_reorder(self):
|
|
self._setup(
|
|
ordering_list("position", count_from=1, reorder_on_append=True)
|
|
)
|
|
|
|
s1 = Slide("Slide #1")
|
|
|
|
self.assert_(not s1.bullets)
|
|
self.assert_(len(s1.bullets) == 0)
|
|
|
|
s1.bullets.append(Bullet("s1/b1"))
|
|
|
|
self.assert_(s1.bullets)
|
|
self.assert_(len(s1.bullets) == 1)
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
|
|
s1.bullets.append(Bullet("s1/b2"))
|
|
|
|
self.assert_(len(s1.bullets) == 2)
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
self.assert_(s1.bullets[1].position == 2)
|
|
|
|
bul = Bullet("s1/b100")
|
|
bul.position = 100
|
|
s1.bullets.append(bul)
|
|
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
self.assert_(s1.bullets[1].position == 2)
|
|
self.assert_(s1.bullets[2].position == 3)
|
|
|
|
s1.bullets.append(Bullet("s1/b4"))
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
self.assert_(s1.bullets[1].position == 2)
|
|
self.assert_(s1.bullets[2].position == 3)
|
|
self.assert_(s1.bullets[3].position == 4)
|
|
|
|
s1.bullets._reorder()
|
|
self.assert_(s1.bullets[0].position == 1)
|
|
self.assert_(s1.bullets[1].position == 2)
|
|
self.assert_(s1.bullets[2].position == 3)
|
|
self.assert_(s1.bullets[3].position == 4)
|
|
|
|
s1.bullets._raw_append(Bullet("raw"))
|
|
self.assert_(s1.bullets[4].position is None)
|
|
|
|
s1.bullets._reorder()
|
|
self.assert_(s1.bullets[4].position == 5)
|
|
session = fixture_session()
|
|
session.add(s1)
|
|
session.flush()
|
|
|
|
id_ = s1.id
|
|
session.expunge_all()
|
|
del s1
|
|
|
|
srt = session.get(Slide, id_)
|
|
|
|
self.assert_(srt.bullets)
|
|
self.assert_(len(srt.bullets) == 5)
|
|
|
|
titles = ["s1/b1", "s1/b2", "s1/b100", "s1/b4", "raw"]
|
|
found = [b.text for b in srt.bullets]
|
|
eq_(titles, found)
|
|
|
|
srt.bullets._raw_append(Bullet("raw2"))
|
|
srt.bullets[-1].position = 6
|
|
session.flush()
|
|
session.expunge_all()
|
|
|
|
srt = session.get(Slide, id_)
|
|
titles = ["s1/b1", "s1/b2", "s1/b100", "s1/b4", "raw", "raw2"]
|
|
found = [b.text for b in srt.bullets]
|
|
eq_(titles, found)
|
|
|
|
def test_insert(self):
|
|
self._setup(ordering_list("position"))
|
|
|
|
s1 = Slide("Slide #1")
|
|
s1.bullets.append(Bullet("1"))
|
|
s1.bullets.append(Bullet("2"))
|
|
s1.bullets.append(Bullet("3"))
|
|
s1.bullets.append(Bullet("4"))
|
|
|
|
self.assert_(s1.bullets[0].position == 0)
|
|
self.assert_(s1.bullets[1].position == 1)
|
|
self.assert_(s1.bullets[2].position == 2)
|
|
self.assert_(s1.bullets[3].position == 3)
|
|
|
|
s1.bullets.insert(2, Bullet("insert_at_2"))
|
|
self.assert_(s1.bullets[0].position == 0)
|
|
self.assert_(s1.bullets[1].position == 1)
|
|
self.assert_(s1.bullets[2].position == 2)
|
|
self.assert_(s1.bullets[3].position == 3)
|
|
self.assert_(s1.bullets[4].position == 4)
|
|
|
|
self.assert_(s1.bullets[1].text == "2")
|
|
self.assert_(s1.bullets[2].text == "insert_at_2")
|
|
self.assert_(s1.bullets[3].text == "3")
|
|
|
|
s1.bullets.insert(999, Bullet("999"))
|
|
|
|
self.assert_(len(s1.bullets) == 6)
|
|
self.assert_(s1.bullets[5].position == 5)
|
|
|
|
session = fixture_session()
|
|
session.add(s1)
|
|
session.flush()
|
|
|
|
id_ = s1.id
|
|
session.expunge_all()
|
|
del s1
|
|
|
|
srt = session.get(Slide, id_)
|
|
|
|
self.assert_(srt.bullets)
|
|
self.assert_(len(srt.bullets) == 6)
|
|
|
|
texts = ["1", "2", "insert_at_2", "3", "4", "999"]
|
|
found = [b.text for b in srt.bullets]
|
|
|
|
self.assert_(texts == found)
|
|
|
|
def test_slice(self):
|
|
self._setup(ordering_list("position"))
|
|
|
|
b = [
|
|
Bullet("1"),
|
|
Bullet("2"),
|
|
Bullet("3"),
|
|
Bullet("4"),
|
|
Bullet("5"),
|
|
Bullet("6"),
|
|
]
|
|
s1 = Slide("Slide #1")
|
|
|
|
# 1, 2, 3
|
|
s1.bullets[0:3] = iter(b[0:3])
|
|
for i in 0, 1, 2:
|
|
self.assert_(s1.bullets[i].position == i)
|
|
self.assert_(s1.bullets[i] == b[i])
|
|
|
|
# 1, 4, 5, 6, 3
|
|
s1.bullets[1:2] = b[3:6]
|
|
for li, bi in (0, 0), (1, 3), (2, 4), (3, 5), (4, 2):
|
|
self.assert_(s1.bullets[li].position == li)
|
|
self.assert_(s1.bullets[li] == b[bi])
|
|
|
|
# 1, 6, 3
|
|
del s1.bullets[1:3]
|
|
for li, bi in (0, 0), (1, 5), (2, 2):
|
|
self.assert_(s1.bullets[li].position == li)
|
|
self.assert_(s1.bullets[li] == b[bi])
|
|
|
|
session = fixture_session()
|
|
session.add(s1)
|
|
session.flush()
|
|
|
|
id_ = s1.id
|
|
session.expunge_all()
|
|
del s1
|
|
|
|
srt = session.get(Slide, id_)
|
|
|
|
self.assert_(srt.bullets)
|
|
self.assert_(len(srt.bullets) == 3)
|
|
|
|
texts = ["1", "6", "3"]
|
|
for i, text in enumerate(texts):
|
|
self.assert_(srt.bullets[i].position == i)
|
|
self.assert_(srt.bullets[i].text == text)
|
|
|
|
def test_replace(self):
|
|
self._setup(ordering_list("position"))
|
|
|
|
s1 = Slide("Slide #1")
|
|
s1.bullets = [Bullet("1"), Bullet("2"), Bullet("3")]
|
|
|
|
self.assert_(len(s1.bullets) == 3)
|
|
self.assert_(s1.bullets[2].position == 2)
|
|
|
|
session = fixture_session()
|
|
session.add(s1)
|
|
session.flush()
|
|
|
|
new_bullet = Bullet("new 2")
|
|
self.assert_(new_bullet.position is None)
|
|
|
|
# mark existing bullet as db-deleted before replacement.
|
|
# session.delete(s1.bullets[1])
|
|
s1.bullets[1] = new_bullet
|
|
|
|
self.assert_(new_bullet.position == 1)
|
|
self.assert_(len(s1.bullets) == 3)
|
|
|
|
id_ = s1.id
|
|
|
|
session.flush()
|
|
session.expunge_all()
|
|
|
|
srt = session.get(Slide, id_)
|
|
|
|
self.assert_(srt.bullets)
|
|
self.assert_(len(srt.bullets) == 3)
|
|
|
|
self.assert_(srt.bullets[1].text == "new 2")
|
|
self.assert_(srt.bullets[2].text == "3")
|
|
|
|
def test_replace_two(self):
|
|
"""test #3191"""
|
|
|
|
self._setup(ordering_list("position", reorder_on_append=True))
|
|
|
|
s1 = Slide("Slide #1")
|
|
|
|
b1, b2, b3, b4 = Bullet("1"), Bullet("2"), Bullet("3"), Bullet("4")
|
|
s1.bullets = [b1, b2, b3]
|
|
|
|
eq_([b.position for b in s1.bullets], [0, 1, 2])
|
|
|
|
s1.bullets = [b4, b2, b1]
|
|
eq_([b.position for b in s1.bullets], [0, 1, 2])
|
|
|
|
def test_funky_ordering(self):
|
|
class Pos:
|
|
def __init__(self):
|
|
self.position = None
|
|
|
|
step_factory = ordering_list(
|
|
"position", ordering_func=step_numbering(2)
|
|
)
|
|
|
|
stepped = step_factory()
|
|
stepped.append(Pos())
|
|
stepped.append(Pos())
|
|
stepped.append(Pos())
|
|
stepped.append(Pos())
|
|
|
|
for li, pos in (0, 0), (1, 2), (2, 4), (3, 6):
|
|
self.assert_(stepped[li].position == pos)
|
|
|
|
fib_factory = ordering_list(
|
|
"position", ordering_func=fibonacci_numbering("position")
|
|
)
|
|
|
|
fibbed = fib_factory()
|
|
fibbed.append(Pos())
|
|
fibbed.append(Pos())
|
|
fibbed.append(Pos())
|
|
fibbed.append(Pos())
|
|
fibbed.append(Pos())
|
|
|
|
for li, pos in (0, 1), (1, 2), (2, 3), (3, 5), (4, 8):
|
|
self.assert_(fibbed[li].position == pos)
|
|
|
|
fibbed.insert(2, Pos())
|
|
fibbed.insert(4, Pos())
|
|
fibbed.insert(6, Pos())
|
|
|
|
for li, pos in (
|
|
(0, 1),
|
|
(1, 2),
|
|
(2, 3),
|
|
(3, 5),
|
|
(4, 8),
|
|
(5, 13),
|
|
(6, 21),
|
|
(7, 34),
|
|
):
|
|
self.assert_(fibbed[li].position == pos)
|
|
|
|
alpha_factory = ordering_list("position", ordering_func=alpha_ordering)
|
|
alpha = alpha_factory()
|
|
alpha.append(Pos())
|
|
alpha.append(Pos())
|
|
alpha.append(Pos())
|
|
|
|
alpha.insert(1, Pos())
|
|
|
|
for li, pos in (0, "A"), (1, "B"), (2, "C"), (3, "D"):
|
|
self.assert_(alpha[li].position == pos)
|
|
|
|
def test_picklability(self):
|
|
from sqlalchemy.ext.orderinglist import OrderingList
|
|
|
|
olist = OrderingList("order", reorder_on_append=True)
|
|
olist.append(DummyItem())
|
|
|
|
for loads, dumps in picklers():
|
|
pck = dumps(olist)
|
|
copy = loads(pck)
|
|
|
|
self.assert_(copy == olist)
|
|
self.assert_(copy.__dict__ == olist.__dict__)
|
|
|
|
|
|
class DummyItem:
|
|
def __init__(self, order=None):
|
|
self.order = order
|
|
|
|
def __eq__(self, other):
|
|
return self.order == other.order
|
|
|
|
def __ne__(self, other):
|
|
return not (self == other)
|
|
|
|
|
|
class MockIndex:
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def __index__(self):
|
|
return self.value
|