Files
sqlalchemy/test/orm/test_utils.py
T
2025-07-02 23:20:47 +02:00

1331 lines
41 KiB
Python

import re
from sqlalchemy import Column
from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy.engine import result
from sqlalchemy.ext.hybrid import hybrid_method
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import aliased
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.orm import synonym
from sqlalchemy.orm import util as orm_util
from sqlalchemy.orm import with_polymorphic
from sqlalchemy.orm.path_registry import PathRegistry
from sqlalchemy.orm.path_registry import PathToken
from sqlalchemy.orm.path_registry import RootRegistry
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing.assertions import is_true
from sqlalchemy.testing.fixtures import fixture_session
from test.orm import _fixtures
from .inheritance import _poly_fixtures
class ContextualWarningsTest(fixtures.TestBase):
"""
Test for #7305
"""
@testing.fixture
def plain_fixture(cls, decl_base):
class Foo(decl_base):
__tablename__ = "foo"
id = Column(Integer, primary_key=True)
decl_base.metadata.create_all(testing.db)
return Foo
@testing.fixture
def overlap_fixture(cls, decl_base):
class Foo(decl_base):
__tablename__ = "foo"
id = Column(Integer, primary_key=True)
bars = relationship(
"Bar",
primaryjoin="Foo.id==Bar.foo_id",
)
class Bar(decl_base):
__tablename__ = "bar"
id = Column(Integer, primary_key=True)
foo_id = Column(Integer, ForeignKey("foo.id"))
foos = relationship(
"Foo",
primaryjoin="Bar.foo_id==Foo.id",
)
return Foo, Bar
def test_configure_mappers_explicit(self, overlap_fixture, decl_base):
with expect_warnings(
re.escape(
"relationship 'Bar.foos' will copy column foo.id to column "
"bar.foo_id, which conflicts with relationship(s): 'Foo.bars' "
"(copies foo.id to bar.foo_id). "
),
):
decl_base.registry.configure()
def test_configure_mappers_implicit_aliased(self, overlap_fixture):
Foo, Bar = overlap_fixture
with expect_warnings(
re.escape(
"relationship 'Bar.foos' will copy column foo.id "
"to column bar.foo_id, which conflicts with"
)
+ ".*"
+ re.escape(
"(This warning originated from the `configure_mappers()` "
"process, which was "
"invoked automatically in response to a user-initiated "
"operation.)"
),
):
FooAlias = aliased(Foo)
assert hasattr(FooAlias, "bars")
def test_configure_mappers_implicit_instantiate(self, overlap_fixture):
Foo, Bar = overlap_fixture
with expect_warnings(
re.escape(
"relationship 'Bar.foos' will copy column foo.id "
"to column bar.foo_id, which conflicts with"
)
+ ".*"
+ re.escape(
"(This warning originated from the `configure_mappers()` "
"process, which was "
"invoked automatically in response to a user-initiated "
"operation.)"
),
):
foo = Foo()
assert hasattr(foo, "bars")
def test_autoflush_implicit(self, plain_fixture):
Foo = plain_fixture
sess = fixture_session()
@event.listens_for(Foo, "before_insert")
def emit_a_warning(mapper, connection, state):
sess.add(Foo())
sess.add(Foo())
with expect_warnings(
re.escape(
"Usage of the 'Session.add()' operation is not "
"currently supported within the execution stage of the flush "
"process. Results may not be consistent. Consider using "
"alternative event listeners or connection-level operations "
"instead."
)
+ ".*"
+ re.escape(
"(This warning originated from the Session 'autoflush' "
"process, which was invoked automatically in response to a "
"user-initiated operation. Consider using ``no_autoflush`` "
"context manager if this warning happened while "
"initializing objects.)"
),
):
sess.execute(select(Foo))
class AliasedClassTest(fixtures.MappedTest, AssertsCompiledSQL):
__dialect__ = "default"
def _fixture(self, cls, properties={}):
table = Table(
"point",
MetaData(),
Column("id", Integer(), primary_key=True),
Column("x", Integer),
Column("y", Integer),
)
clear_mappers()
self.mapper_registry.map_imperatively(
cls, table, properties=properties
)
return table
def test_simple(self):
class Point:
pass
table = self._fixture(Point)
alias = aliased(Point)
assert alias.id
assert alias.x
assert alias.y
assert Point.id.__clause_element__().table is table
assert alias.id.__clause_element__().table is not table
def test_named_entity(self):
class Point:
pass
self._fixture(Point)
alias = aliased(Point, name="pp")
self.assert_compile(
select(alias), "SELECT pp.id, pp.x, pp.y FROM point AS pp"
)
def test_named_selectable(self):
class Point:
pass
table = self._fixture(Point)
alias = aliased(table, name="pp")
self.assert_compile(
select(alias), "SELECT pp.id, pp.x, pp.y FROM point AS pp"
)
def test_not_instantiatable(self):
class Point:
pass
self._fixture(Point)
alias = aliased(Point)
assert_raises(TypeError, alias)
def test_instancemethod(self):
class Point:
def zero(self):
self.x, self.y = 0, 0
self._fixture(Point)
alias = aliased(Point)
assert Point.zero
assert getattr(alias, "zero")
def test_classmethod(self):
class Point:
@classmethod
def max_x(cls):
return 100
self._fixture(Point)
alias = aliased(Point)
assert Point.max_x
assert alias.max_x
assert Point.max_x() == alias.max_x() == 100
def test_simple_property(self):
class Point:
@property
def max_x(self):
return 100
self._fixture(Point)
alias = aliased(Point)
assert Point.max_x
assert Point.max_x != 100
assert alias.max_x
assert Point.max_x is alias.max_x
def test_descriptors(self):
class descriptor:
def __init__(self, fn):
self.fn = fn
def __get__(self, obj, owner):
if obj is not None:
return self.fn(obj, obj)
else:
return self
def method(self):
return "method"
class Point:
center = (0, 0)
@descriptor
def thing(self, arg):
return arg.center
self._fixture(Point)
alias = aliased(Point)
assert Point.thing != (0, 0)
assert Point().thing == (0, 0)
assert Point.thing.method() == "method"
assert alias.thing != (0, 0)
assert alias.thing.method() == "method"
def _assert_has_table(self, expr, table):
from sqlalchemy import Column # override testlib's override
for child in expr.get_children():
if isinstance(child, Column):
assert child.table is table
def test_hybrid_descriptor_one(self):
class Point:
def __init__(self, x, y):
self.x, self.y = x, y
@hybrid_method
def left_of(self, other):
return self.x < other.x
self._fixture(Point)
alias = aliased(Point)
sess = fixture_session()
self.assert_compile(
sess.query(alias).filter(alias.left_of(Point)),
"SELECT point_1.id AS point_1_id, point_1.x AS point_1_x, "
"point_1.y AS point_1_y FROM point AS point_1, point "
"WHERE point_1.x < point.x",
)
def test_hybrid_descriptor_two(self):
class Point:
def __init__(self, x, y):
self.x, self.y = x, y
@hybrid_property
def double_x(self):
return self.x * 2
self._fixture(Point)
alias = aliased(Point)
eq_(str(Point.double_x), "Point.double_x")
eq_(str(alias.double_x), "aliased(Point).double_x")
eq_(str(Point.double_x.__clause_element__()), "point.x * :x_1")
eq_(str(alias.double_x.__clause_element__()), "point_1.x * :x_1")
sess = fixture_session()
self.assert_compile(
sess.query(alias).filter(alias.double_x > Point.x),
"SELECT point_1.id AS point_1_id, point_1.x AS point_1_x, "
"point_1.y AS point_1_y FROM point AS point_1, point "
"WHERE point_1.x * :x_1 > point.x",
)
def test_hybrid_descriptor_three(self):
class Point:
def __init__(self, x, y):
self.x, self.y = x, y
@hybrid_property
def x_alone(self):
return self.x
self._fixture(Point)
alias = aliased(Point)
eq_(str(Point.x_alone), "Point.x_alone")
eq_(str(alias.x_alone), "aliased(Point).x_alone")
# from __clause_element__() perspective, Point.x_alone
# and Point.x return the same thing, so that's good
eq_(str(Point.x.__clause_element__()), "point.x")
eq_(str(Point.x_alone.__clause_element__()), "point.x")
# same for the alias
eq_(str(alias.x + 1), "point_1.x + :x_1")
eq_(str(alias.x_alone + 1), "point_1.x + :x_1")
point_mapper = inspect(Point)
eq_(
Point.x_alone._annotations,
{
"entity_namespace": point_mapper,
"parententity": point_mapper,
"parentmapper": point_mapper,
"proxy_key": "x_alone",
"proxy_owner": point_mapper,
},
)
eq_(
Point.x._annotations,
{
"entity_namespace": point_mapper,
"parententity": point_mapper,
"parentmapper": point_mapper,
"proxy_key": "x",
"proxy_owner": point_mapper,
},
)
eq_(str(alias.x_alone == alias.x), "point_1.x = point_1.x")
a2 = aliased(Point)
eq_(str(a2.x_alone == alias.x), "point_1.x = point_2.x")
eq_(
a2.x._annotations,
{
"entity_namespace": inspect(a2),
"parententity": inspect(a2),
"parentmapper": point_mapper,
"proxy_key": "x",
"proxy_owner": inspect(a2),
},
)
sess = fixture_session()
self.assert_compile(
sess.query(alias).filter(alias.x_alone > Point.x),
"SELECT point_1.id AS point_1_id, point_1.x AS point_1_x, "
"point_1.y AS point_1_y FROM point AS point_1, point "
"WHERE point_1.x > point.x",
)
def test_proxy_descriptor_one(self):
class Point:
def __init__(self, x, y):
self.x, self.y = x, y
self._fixture(Point, properties={"x_syn": synonym("x")})
alias = aliased(Point)
eq_(str(Point.x_syn), "Point.x_syn")
eq_(str(alias.x_syn), "aliased(Point).x_syn")
sess = fixture_session()
self.assert_compile(
sess.query(alias.x_syn).filter(alias.x_syn > Point.x_syn),
"SELECT point_1.x AS point_1_x FROM point AS point_1, point "
"WHERE point_1.x > point.x",
)
def test_meta_getattr_one(self):
class MetaPoint(type):
def __getattr__(cls, key):
if key == "x_syn":
return cls.x
raise AttributeError(key)
class Point(metaclass=MetaPoint):
pass
self._fixture(Point)
alias = aliased(Point)
eq_(str(Point.x_syn), "Point.x")
eq_(str(alias.x_syn), "aliased(Point).x")
# from __clause_element__() perspective, Point.x_syn
# and Point.x return the same thing, so that's good
eq_(str(Point.x.__clause_element__()), "point.x")
eq_(str(Point.x_syn.__clause_element__()), "point.x")
# same for the alias
eq_(str(alias.x + 1), "point_1.x + :x_1")
eq_(str(alias.x_syn + 1), "point_1.x + :x_1")
is_(Point.x_syn.__clause_element__(), Point.x.__clause_element__())
eq_(str(alias.x_syn == alias.x), "point_1.x = point_1.x")
a2 = aliased(Point)
eq_(str(a2.x_syn == alias.x), "point_1.x = point_2.x")
sess = fixture_session()
self.assert_compile(
sess.query(alias).filter(alias.x_syn > Point.x),
"SELECT point_1.id AS point_1_id, point_1.x AS point_1_x, "
"point_1.y AS point_1_y FROM point AS point_1, point "
"WHERE point_1.x > point.x",
)
def test_meta_getattr_two(self):
class MetaPoint(type):
def __getattr__(cls, key):
if key == "double_x":
return cls._impl_double_x
raise AttributeError(key)
class Point(metaclass=MetaPoint):
@hybrid_property
def _impl_double_x(self):
return self.x * 2
self._fixture(Point)
alias = aliased(Point)
eq_(str(Point.double_x), "Point._impl_double_x")
eq_(str(alias.double_x), "aliased(Point)._impl_double_x")
eq_(str(Point.double_x.__clause_element__()), "point.x * :x_1")
eq_(str(alias.double_x.__clause_element__()), "point_1.x * :x_1")
sess = fixture_session()
self.assert_compile(
sess.query(alias).filter(alias.double_x > Point.x),
"SELECT point_1.id AS point_1_id, point_1.x AS point_1_x, "
"point_1.y AS point_1_y FROM point AS point_1, point "
"WHERE point_1.x * :x_1 > point.x",
)
def test_meta_getattr_three(self):
class MetaPoint(type):
def __getattr__(cls, key):
@hybrid_property
def double_x(me):
return me.x * 2
if key == "double_x":
return double_x.__get__(None, cls)
raise AttributeError(key)
class Point(metaclass=MetaPoint):
pass
self._fixture(Point)
alias = aliased(Point)
eq_(str(Point.double_x.__clause_element__()), "point.x * :x_1")
eq_(str(alias.double_x.__clause_element__()), "point_1.x * :x_1")
sess = fixture_session()
self.assert_compile(
sess.query(alias).filter(alias.double_x > Point.x),
"SELECT point_1.id AS point_1_id, point_1.x AS point_1_x, "
"point_1.y AS point_1_y FROM point AS point_1, point "
"WHERE point_1.x * :x_1 > point.x",
)
def test_parententity_vs_parentmapper(self):
class Point:
pass
self._fixture(Point, properties={"x_syn": synonym("x")})
pa = aliased(Point)
is_(Point.x_syn._parententity, inspect(Point))
is_(Point.x._parententity, inspect(Point))
is_(Point.x_syn._parentmapper, inspect(Point))
is_(Point.x._parentmapper, inspect(Point))
is_(
Point.x_syn.__clause_element__()._annotations["parententity"],
inspect(Point),
)
is_(
Point.x.__clause_element__()._annotations["parententity"],
inspect(Point),
)
is_(
Point.x_syn.__clause_element__()._annotations["parentmapper"],
inspect(Point),
)
is_(
Point.x.__clause_element__()._annotations["parentmapper"],
inspect(Point),
)
pa = aliased(Point)
is_(pa.x_syn._parententity, inspect(pa))
is_(pa.x._parententity, inspect(pa))
is_(pa.x_syn._parentmapper, inspect(Point))
is_(pa.x._parentmapper, inspect(Point))
is_(
pa.x_syn.__clause_element__()._annotations["parententity"],
inspect(pa),
)
is_(
pa.x.__clause_element__()._annotations["parententity"], inspect(pa)
)
is_(
pa.x_syn.__clause_element__()._annotations["parentmapper"],
inspect(Point),
)
is_(
pa.x.__clause_element__()._annotations["parentmapper"],
inspect(Point),
)
class IdentityKeyTest(_fixtures.FixtureTest):
run_inserts = None
def _cases():
return testing.combinations(
(orm_util,), (Session,), argnames="ormutil"
)
@_cases()
def test_identity_key_1(self, ormutil):
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
key = ormutil.identity_key(User, [1])
eq_(key, (User, (1,), None))
key = ormutil.identity_key(User, ident=[1])
eq_(key, (User, (1,), None))
@_cases()
def test_identity_key_scalar(self, ormutil):
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
key = ormutil.identity_key(User, 1)
eq_(key, (User, (1,), None))
key = ormutil.identity_key(User, ident=1)
eq_(key, (User, (1,), None))
@_cases()
def test_identity_key_2(self, ormutil):
users, User = self.tables.users, self.classes.User
self.mapper_registry.map_imperatively(User, users)
s = fixture_session()
u = User(name="u1")
s.add(u)
s.flush()
key = ormutil.identity_key(instance=u)
eq_(key, (User, (u.id,), None))
@_cases()
@testing.combinations("dict", "row", "mapping", argnames="rowtype")
def test_identity_key_3(self, ormutil, rowtype):
"""test a real Row works with identity_key.
this was broken w/ 1.4 future mode as we are assuming a mapping
here. to prevent regressions, identity_key now accepts any of
dict, RowMapping, Row for the "row".
found_during_type_annotation
"""
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
if rowtype == "dict":
row = {users.c.id: 1, users.c.name: "Frank"}
elif rowtype in ("mapping", "row"):
row = result.result_tuple([users.c.id, users.c.name])((1, "Frank"))
if rowtype == "mapping":
row = row._mapping
key = ormutil.identity_key(User, row=row)
eq_(key, (User, (1,), None))
def test_identity_key_token(self):
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
key = orm_util.identity_key(User, [1], identity_token="token")
eq_(key, (User, (1,), "token"))
key = orm_util.identity_key(User, ident=[1], identity_token="token")
eq_(key, (User, (1,), "token"))
class PathRegistryTest(_fixtures.FixtureTest):
run_setup_mappers = "once"
run_inserts = None
run_deletes = None
@classmethod
def setup_mappers(cls):
cls._setup_stock_mapping()
def test_root_registry(self):
umapper = inspect(self.classes.User)
is_(RootRegistry()[umapper], umapper._path_registry)
eq_(RootRegistry()[umapper], PathRegistry.coerce((umapper,)))
def test_expand(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
path = PathRegistry.coerce((umapper,))
eq_(
path[umapper.attrs.addresses][amapper][
amapper.attrs.email_address
],
PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
),
)
def test_entity_boolean(self):
umapper = inspect(self.classes.User)
path = PathRegistry.coerce((umapper,))
is_(bool(path), True)
def test_key_boolean(self):
umapper = inspect(self.classes.User)
path = PathRegistry.coerce((umapper, umapper.attrs.addresses))
is_(bool(path), True)
def test_aliased_class(self):
User = self.classes.User
ua = aliased(User)
ua_insp = inspect(ua)
path = PathRegistry.coerce((ua_insp, ua_insp.mapper.attrs.addresses))
assert path.parent.is_aliased_class
def test_indexed_entity(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
path = PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
)
is_(path[0], umapper)
is_(path[2], amapper)
def test_indexed_key_token(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
path = PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
PathToken.intern(":*"),
)
)
is_true(path.is_token)
eq_(path[1], umapper.attrs.addresses)
eq_(path[3], ":*")
with expect_raises(IndexError):
path[amapper]
def test_slice_token(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
path = PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
PathToken.intern(":*"),
)
)
is_true(path.is_token)
eq_(path[1:3], (umapper.attrs.addresses, amapper))
def test_indexed_key(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
path = PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
)
eq_(path[1], umapper.attrs.addresses)
eq_(path[3], amapper.attrs.email_address)
def test_slice(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
path = PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
)
eq_(path[1:3], (umapper.attrs.addresses, amapper))
def test_addition(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
eq_(
p1 + p2,
PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
),
)
def test_length(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
pneg1 = PathRegistry.coerce(())
p0 = PathRegistry.coerce((umapper,))
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p3 = PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
)
eq_(len(pneg1), 0)
eq_(len(p0), 1)
eq_(len(p1), 2)
eq_(len(p2), 3)
eq_(len(p3), 4)
eq_(pneg1.length, 0)
eq_(p0.length, 1)
eq_(p1.length, 2)
eq_(p2.length, 3)
eq_(p3.length, 4)
def test_eq(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
u_alias = inspect(aliased(self.classes.User))
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p3 = PathRegistry.coerce((umapper, umapper.attrs.name))
p4 = PathRegistry.coerce((u_alias, umapper.attrs.addresses))
p5 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p6 = PathRegistry.coerce(
(amapper, amapper.attrs.user, umapper, umapper.attrs.addresses)
)
p7 = PathRegistry.coerce(
(
amapper,
amapper.attrs.user,
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
)
is_(p1 == p2, True)
is_(p1 == p3, False)
is_(p1 == p4, False)
is_(p1 == p5, False)
is_(p6 == p7, False)
is_(p6 == p7.parent.parent, True)
is_(p1 != p2, False)
is_(p1 != p3, True)
is_(p1 != p4, True)
is_(p1 != p5, True)
def test_eq_non_path(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
u_alias = inspect(aliased(self.classes.User))
p1 = PathRegistry.coerce((umapper,))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p3 = PathRegistry.coerce((u_alias, umapper.attrs.addresses))
p4 = PathRegistry.coerce((u_alias, umapper.attrs.addresses, amapper))
p5 = PathRegistry.coerce((u_alias,)).token(":*")
non_object = 54.1432
for obj in [p1, p2, p3, p4, p5]:
with expect_warnings(
"Comparison of PathRegistry to "
"<.* 'float'> is not supported"
):
is_(obj == non_object, False)
with expect_warnings(
"Comparison of PathRegistry to "
"<.* 'float'> is not supported"
):
is_(obj != non_object, True)
def test_contains_mapper(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
assert p1.contains_mapper(umapper)
assert not p1.contains_mapper(amapper)
def test_path(self):
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p3 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
eq_(p1.path, (umapper, umapper.attrs.addresses))
eq_(p2.path, (umapper, umapper.attrs.addresses, amapper))
eq_(p3.path, (amapper, amapper.attrs.email_address))
def test_registry_set(self):
reg = {}
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p3 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
p1.set(reg, "p1key", "p1value")
p2.set(reg, "p2key", "p2value")
p3.set(reg, "p3key", "p3value")
eq_(
reg,
{
("p1key", p1.path): "p1value",
("p2key", p2.path): "p2value",
("p3key", p3.path): "p3value",
},
)
def test_registry_get(self):
reg = {}
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p3 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
reg.update(
{
("p1key", p1.path): "p1value",
("p2key", p2.path): "p2value",
("p3key", p3.path): "p3value",
}
)
eq_(p1.get(reg, "p1key"), "p1value")
eq_(p2.get(reg, "p2key"), "p2value")
eq_(p2.get(reg, "p1key"), None)
eq_(p3.get(reg, "p3key"), "p3value")
eq_(p3.get(reg, "p1key"), None)
def test_registry_contains(self):
reg = {}
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p3 = PathRegistry.coerce((amapper, amapper.attrs.email_address))
reg.update(
{
("p1key", p1.path): "p1value",
("p2key", p2.path): "p2value",
("p3key", p3.path): "p3value",
}
)
assert p1.contains(reg, "p1key")
assert not p1.contains(reg, "p2key")
assert p3.contains(reg, "p3key")
assert not p2.contains(reg, "fake")
def test_registry_setdefault(self):
reg = {}
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
p1 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
reg.update({("p1key", p1.path): "p1value"})
p1.setdefault(reg, "p1key", "p1newvalue_a")
p1.setdefault(reg, "p1key_new", "p1newvalue_b")
p2.setdefault(reg, "p2key", "p2newvalue")
eq_(
reg,
{
("p1key", p1.path): "p1value",
("p1key_new", p1.path): "p1newvalue_b",
("p2key", p2.path): "p2newvalue",
},
)
def test_serialize(self):
User = self.classes.User
Address = self.classes.Address
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
p1 = PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
)
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p3 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
eq_(p1.serialize(), [(User, "addresses"), (Address, "email_address")])
eq_(p2.serialize(), [(User, "addresses"), (Address, None)])
eq_(p3.serialize(), [(User, "addresses")])
def test_deseralize(self):
User = self.classes.User
Address = self.classes.Address
umapper = inspect(self.classes.User)
amapper = inspect(self.classes.Address)
p1 = PathRegistry.coerce(
(
umapper,
umapper.attrs.addresses,
amapper,
amapper.attrs.email_address,
)
)
p2 = PathRegistry.coerce((umapper, umapper.attrs.addresses, amapper))
p3 = PathRegistry.coerce((umapper, umapper.attrs.addresses))
eq_(
PathRegistry.deserialize(
[(User, "addresses"), (Address, "email_address")]
),
p1,
)
eq_(
PathRegistry.deserialize([(User, "addresses"), (Address, None)]),
p2,
)
eq_(PathRegistry.deserialize([(User, "addresses")]), p3)
class PathRegistryInhTest(_poly_fixtures._Polymorphic):
run_setup_mappers = "once"
run_inserts = None
run_deletes = None
def test_plain(self):
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
pmapper = inspect(Person)
emapper = inspect(Engineer)
p1 = PathRegistry.coerce((pmapper, emapper.attrs.machines))
# given a mapper and an attribute on a subclass,
# the path converts what you get to be against that subclass
eq_(p1.path, (emapper, emapper.attrs.machines))
def test_plain_compound(self):
Company = _poly_fixtures.Company
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
cmapper = inspect(Company)
pmapper = inspect(Person)
emapper = inspect(Engineer)
p1 = PathRegistry.coerce(
(cmapper, cmapper.attrs.employees, pmapper, emapper.attrs.machines)
)
# given a mapper and an attribute on a subclass,
# the path converts what you get to be against that subclass
eq_(
p1.path,
(
cmapper,
cmapper.attrs.employees,
emapper,
emapper.attrs.machines,
),
)
def test_plain_aliased(self):
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
emapper = inspect(Engineer)
p_alias = aliased(Person)
p_alias = inspect(p_alias)
p1 = PathRegistry.coerce((p_alias, emapper.attrs.machines))
# plain AliasedClass - the path keeps that AliasedClass directly
# as is in the path
eq_(p1.path, (p_alias, emapper.attrs.machines))
def test_plain_aliased_compound(self):
Company = _poly_fixtures.Company
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
cmapper = inspect(Company)
emapper = inspect(Engineer)
c_alias = aliased(Company)
p_alias = aliased(Person)
c_alias = inspect(c_alias)
p_alias = inspect(p_alias)
p1 = PathRegistry.coerce(
(c_alias, cmapper.attrs.employees, p_alias, emapper.attrs.machines)
)
# plain AliasedClass - the path keeps that AliasedClass directly
# as is in the path
eq_(
p1.path,
(
c_alias,
cmapper.attrs.employees,
p_alias,
emapper.attrs.machines,
),
)
def test_with_poly_sub(self):
Company = _poly_fixtures.Company
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
emapper = inspect(Engineer)
cmapper = inspect(Company)
p_poly = with_polymorphic(Person, [Engineer])
e_poly_insp = inspect(p_poly.Engineer) # noqa - used by comment below
p_poly_insp = inspect(p_poly)
p1 = PathRegistry.coerce((p_poly_insp, emapper.attrs.machines))
# changes as of #5082: when a with_polymorphic is in the middle
# of a path, the natural path makes sure it uses the base mappers,
# however when it's at the root, the with_polymorphic stays in
# the natural path
# this behavior is the same as pre #5082, it was temporarily changed
# but this proved to be incorrect. The path starts on a
# with_polymorphic(), so a Query will "naturally" construct a path
# that comes from that wp.
eq_(p1.path, (e_poly_insp, emapper.attrs.machines))
eq_(p1.natural_path, (e_poly_insp, emapper.attrs.machines))
# this behavior is new as of the final version of #5082.
# the path starts on a normal entity and has a with_polymorphic
# in the middle, for this to match what Query will generate it needs
# to use the non aliased mappers in the natural path.
p2 = PathRegistry.coerce(
(
cmapper,
cmapper.attrs.employees,
p_poly_insp,
emapper.attrs.machines,
)
)
eq_(
p2.path,
(
cmapper,
cmapper.attrs.employees,
e_poly_insp,
emapper.attrs.machines,
),
)
eq_(
p2.natural_path,
(
cmapper,
cmapper.attrs.employees,
emapper,
emapper.attrs.machines,
),
)
def test_with_poly_base_two(self):
Company = _poly_fixtures.Company
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
cmapper = inspect(Company)
pmapper = inspect(Person)
p_poly = with_polymorphic(Person, [Engineer])
e_poly_insp = inspect(p_poly.Engineer) # noqa - used by comment below
p_poly_insp = inspect(p_poly)
p1 = PathRegistry.coerce(
(
cmapper,
cmapper.attrs.employees,
p_poly_insp,
pmapper.attrs.paperwork,
)
)
eq_(
p1.path,
(
cmapper,
cmapper.attrs.employees,
p_poly_insp,
pmapper.attrs.paperwork,
),
)
eq_(
p1.natural_path,
(
cmapper,
cmapper.attrs.employees,
pmapper,
pmapper.attrs.paperwork,
),
)
def test_nonpoly_oftype_aliased_subclass_onroot(self):
Engineer = _poly_fixtures.Engineer
eng_alias = aliased(Engineer)
ea_insp = inspect(eng_alias)
p1 = PathRegistry.coerce((ea_insp, ea_insp.mapper.attrs.paperwork))
eq_(p1.path, (ea_insp, ea_insp.mapper.attrs.paperwork))
eq_(p1.natural_path, (ea_insp, ea_insp.mapper.attrs.paperwork))
def test_nonpoly_oftype_aliased_subclass(self):
Company = _poly_fixtures.Company
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
cmapper = inspect(Company)
pmapper = inspect(Person)
eng_alias = aliased(Engineer)
ea_insp = inspect(eng_alias)
p1 = PathRegistry.coerce(
(
cmapper,
cmapper.attrs.employees,
ea_insp,
ea_insp.mapper.attrs.paperwork,
)
)
eq_(
p1.path,
(
cmapper,
cmapper.attrs.employees,
ea_insp,
ea_insp.mapper.attrs.paperwork,
),
)
eq_(
p1.natural_path,
(
cmapper,
cmapper.attrs.employees,
pmapper,
pmapper.attrs.paperwork,
),
)
def test_nonpoly_oftype_subclass(self):
Company = _poly_fixtures.Company
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
emapper = inspect(Engineer)
cmapper = inspect(Company)
pmapper = inspect(Person)
p1 = PathRegistry.coerce(
(
cmapper,
cmapper.attrs.employees,
emapper,
emapper.attrs.paperwork,
)
)
eq_(
p1.path,
(
cmapper,
cmapper.attrs.employees,
pmapper,
pmapper.attrs.paperwork,
),
)
eq_(
p1.natural_path,
(
cmapper,
cmapper.attrs.employees,
pmapper,
pmapper.attrs.paperwork,
),
)
def test_with_poly_base_one(self):
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
pmapper = inspect(Person)
emapper = inspect(Engineer)
p_poly = with_polymorphic(Person, [Engineer])
p_poly = inspect(p_poly)
# "name" is actually on Person, not Engineer
p1 = PathRegistry.coerce((p_poly, emapper.attrs.name))
# polymorphic AliasedClass - because "name" is on Person,
# we get Person, not Engineer
eq_(p1.path, (p_poly, pmapper.attrs.name))
def test_with_poly_use_mapper(self):
Person = _poly_fixtures.Person
Engineer = _poly_fixtures.Engineer
emapper = inspect(Engineer)
p_poly = with_polymorphic(Person, [Engineer], _use_mapper_path=True)
p_poly = inspect(p_poly)
p1 = PathRegistry.coerce((p_poly, emapper.attrs.machines))
# polymorphic AliasedClass with the "use_mapper_path" flag -
# the AliasedClass acts just like the base mapper
eq_(p1.path, (emapper, emapper.attrs.machines))