Files
sqlalchemy/test/ext/test_associationproxy.py
Fardin Alizadeh fb408b8e9d fix typos (#13038)
* fix typos

* fix typos

---------

Co-authored-by: fardyn <fa.alizadeh@pm.me>
2025-12-18 21:18:42 +01:00

4157 lines
121 KiB
Python

from __future__ import annotations
from collections import abc
import copy
import dataclasses
import pickle
from typing import List
from unittest.mock import call
from unittest.mock import Mock
from sqlalchemy import cast
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy.engine import default
from sqlalchemy.ext.associationproxy import _AssociationList
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.associationproxy import AssociationProxy
from sqlalchemy.orm import aliased
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import collections
from sqlalchemy.orm import composite
from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import declared_attr
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.orm.collections import attribute_keyed_dict
from sqlalchemy.orm.collections import collection
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_none
from sqlalchemy.testing import is_not_none
from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.entities import ComparableEntity # noqa
from sqlalchemy.testing.entities import ComparableMixin # noqa
from sqlalchemy.testing.fixtures import fixture_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.util import gc_collect
class DictCollection(dict):
@collection.appender
def append(self, obj):
self[obj.foo] = obj
@collection.remover
def remove(self, obj):
del self[obj.foo]
class SetCollection(set):
pass
class ListCollection(list):
pass
class ObjectCollection:
def __init__(self):
self.values = list()
@collection.appender
def append(self, obj):
self.values.append(obj)
@collection.remover
def remove(self, obj):
self.values.remove(obj)
def __iter__(self):
return iter(self.values)
class AutoFlushTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table(
"parent",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
)
Table(
"association",
metadata,
Column("parent_id", ForeignKey("parent.id"), primary_key=True),
Column("child_id", ForeignKey("child.id"), primary_key=True),
Column("name", String(50)),
)
Table(
"child",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("name", String(50)),
)
def _fixture(self, collection_class, is_dict=False):
class Parent:
collection = association_proxy("_collection", "child")
class Child:
def __init__(self, name):
self.name = name
class Association:
if is_dict:
def __init__(self, key, child):
self.child = child
else:
def __init__(self, child):
self.child = child
self.mapper_registry.map_imperatively(
Parent,
self.tables.parent,
properties={
"_collection": relationship(
Association,
collection_class=collection_class,
backref="parent",
)
},
)
self.mapper_registry.map_imperatively(
Association,
self.tables.association,
properties={"child": relationship(Child, backref="association")},
)
self.mapper_registry.map_imperatively(Child, self.tables.child)
return Parent, Child, Association
def _test_premature_flush(self, collection_class, fn, is_dict=False):
Parent, Child, Association = self._fixture(
collection_class, is_dict=is_dict
)
session = Session(
testing.db, autoflush=True, expire_on_commit=True, future=True
)
p1 = Parent()
c1 = Child("c1")
c2 = Child("c2")
session.add(p1)
session.add(c1)
session.add(c2)
fn(p1.collection, c1)
session.commit()
fn(p1.collection, c2)
session.commit()
is_(c1.association[0].parent, p1)
is_(c2.association[0].parent, p1)
session.close()
def test_list_append(self):
self._test_premature_flush(
list, lambda collection, obj: collection.append(obj)
)
def test_list_extend(self):
self._test_premature_flush(
list, lambda collection, obj: collection.extend([obj])
)
def test_set_add(self):
self._test_premature_flush(
set, lambda collection, obj: collection.add(obj)
)
def test_set_extend(self):
self._test_premature_flush(
set, lambda collection, obj: collection.update([obj])
)
def test_dict_set(self):
def set_(collection, obj):
collection[obj.name] = obj
self._test_premature_flush(
collections.attribute_keyed_dict(
"name", ignore_unpopulated_attribute=True
),
set_,
is_dict=True,
)
class _CollectionOperations(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table(
"Parent",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("name", String(128)),
)
Table(
"Children",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("parent_id", Integer, ForeignKey("Parent.id")),
Column("foo", String(128)),
Column("name", String(128)),
)
@classmethod
def setup_mappers(cls):
collection_class = cls.collection_class
class Parent(cls.Basic):
children = association_proxy("_children", "name")
def __init__(self, name):
self.name = name
class Child(cls.Basic):
if collection_class and issubclass(collection_class, dict):
def __init__(self, foo, name):
self.foo = foo
self.name = name
else:
def __init__(self, name):
self.name = name
parents_table, children_table = cls.tables("Parent", "Children")
cls.mapper_registry.map_imperatively(
Parent,
parents_table,
properties={
"_children": relationship(
Child,
lazy="joined",
backref="parent",
collection_class=collection_class,
)
},
)
cls.mapper_registry.map_imperatively(Child, children_table)
def test_abc(self):
Parent = self.classes.Parent
p1 = Parent("x")
collection_class = self.collection_class or list
for abc_ in (abc.Set, abc.MutableMapping, abc.MutableSequence):
if issubclass(collection_class, abc_):
break
else:
abc_ = None
if abc_:
p1 = Parent("x")
assert isinstance(p1.children, abc_)
def roundtrip(self, obj):
if obj not in self.session:
self.session.add(obj)
self.session.flush()
id_, type_ = obj.id, type(obj)
self.session.expunge_all()
return self.session.get(type_, id_)
def _test_sequence_ops(self):
Parent, Child = self.classes("Parent", "Child")
self.session = fixture_session()
p1 = Parent("P1")
def assert_index(expected, value, *args):
"""Assert index of child value is equal to expected.
If expected is None, assert that index raises ValueError.
"""
try:
index = p1.children.index(value, *args)
except ValueError:
self.assert_(expected is None)
else:
self.assert_(expected is not None)
self.assert_(index == expected)
self.assert_(not p1._children)
self.assert_(not p1.children)
ch = Child("regular")
p1._children.append(ch)
self.assert_(ch in p1._children)
self.assert_(len(p1._children) == 1)
self.assert_(p1.children)
self.assert_(len(p1.children) == 1)
self.assert_(ch not in p1.children)
self.assert_("regular" in p1.children)
assert_index(0, "regular")
assert_index(None, "regular", 1)
p1.children.append("proxied")
self.assert_("proxied" in p1.children)
self.assert_("proxied" not in p1._children)
self.assert_(len(p1.children) == 2)
self.assert_(len(p1._children) == 2)
self.assert_(p1._children[0].name == "regular")
self.assert_(p1._children[1].name == "proxied")
assert_index(0, "regular")
assert_index(1, "proxied")
assert_index(1, "proxied", 1)
assert_index(None, "proxied", 0, 1)
del p1._children[1]
self.assert_(len(p1._children) == 1)
self.assert_(len(p1.children) == 1)
self.assert_(p1._children[0] == ch)
assert_index(None, "proxied")
del p1.children[0]
self.assert_(len(p1._children) == 0)
self.assert_(len(p1.children) == 0)
assert_index(None, "regular")
p1.children = ["a", "b", "c"]
self.assert_(len(p1._children) == 3)
self.assert_(len(p1.children) == 3)
assert_index(0, "a")
assert_index(1, "b")
assert_index(2, "c")
del ch
p1 = self.roundtrip(p1)
self.assert_(len(p1._children) == 3)
self.assert_(len(p1.children) == 3)
assert_index(0, "a")
assert_index(1, "b")
assert_index(2, "c")
popped = p1.children.pop()
self.assert_(len(p1.children) == 2)
self.assert_(popped not in p1.children)
assert_index(None, popped)
p1 = self.roundtrip(p1)
self.assert_(len(p1.children) == 2)
self.assert_(popped not in p1.children)
assert_index(None, popped)
p1.children[1] = "changed-in-place"
self.assert_(p1.children[1] == "changed-in-place")
assert_index(1, "changed-in-place")
assert_index(None, "b")
inplace_id = p1._children[1].id
p1 = self.roundtrip(p1)
self.assert_(p1.children[1] == "changed-in-place")
assert p1._children[1].id == inplace_id
p1.children.append("changed-in-place")
self.assert_(p1.children.count("changed-in-place") == 2)
assert_index(1, "changed-in-place")
p1.children.remove("changed-in-place")
self.assert_(p1.children.count("changed-in-place") == 1)
assert_index(1, "changed-in-place")
p1 = self.roundtrip(p1)
self.assert_(p1.children.count("changed-in-place") == 1)
assert_index(1, "changed-in-place")
p1._children = []
self.assert_(len(p1.children) == 0)
assert_index(None, "changed-in-place")
after = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
p1.children = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
self.assert_(len(p1.children) == 10)
self.assert_([c.name for c in p1._children] == after)
for i, val in enumerate(after):
assert_index(i, val)
p1.children[2:6] = ["x"] * 4
after = ["a", "b", "x", "x", "x", "x", "g", "h", "i", "j"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
assert_index(2, "x")
assert_index(3, "x", 3)
assert_index(None, "x", 6)
p1.children[2:6] = ["y"]
after = ["a", "b", "y", "g", "h", "i", "j"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
assert_index(2, "y")
assert_index(None, "y", 3)
p1.children[2:3] = ["z"] * 4
after = ["a", "b", "z", "z", "z", "z", "g", "h", "i", "j"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
p1.children[2::2] = ["O"] * 4
after = ["a", "b", "O", "z", "O", "z", "O", "h", "O", "j"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
assert_raises(TypeError, set, [p1.children])
p1.children *= 0
after = []
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
p1.children += ["a", "b"]
after = ["a", "b"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
p1.children[:] = ["d", "e"]
after = ["d", "e"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
p1.children[:] = ["a", "b"]
p1.children += ["c"]
after = ["a", "b", "c"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
p1.children *= 1
after = ["a", "b", "c"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
p1.children *= 2
after = ["a", "b", "c", "a", "b", "c"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
p1.children = ["a"]
after = ["a"]
self.assert_(p1.children == after)
self.assert_([c.name for c in p1._children] == after)
self.assert_((p1.children * 2) == ["a", "a"])
self.assert_((2 * p1.children) == ["a", "a"])
self.assert_((p1.children * 0) == [])
self.assert_((0 * p1.children) == [])
self.assert_((p1.children + ["b"]) == ["a", "b"])
self.assert_((["b"] + p1.children) == ["b", "a"])
try:
p1.children + 123
assert False
except TypeError:
assert True
class DefaultTest(_CollectionOperations):
collection_class = None
def test_sequence_ops(self):
self._test_sequence_ops()
class ListTest(_CollectionOperations):
collection_class = list
def test_sequence_ops(self):
self._test_sequence_ops()
class CustomDictTest(_CollectionOperations):
collection_class = DictCollection
def test_mapping_ops(self):
Parent, Child = self.classes("Parent", "Child")
self.session = fixture_session()
p1 = Parent("P1")
self.assert_(not p1._children)
self.assert_(not p1.children)
ch = Child("a", "regular")
p1._children.append(ch)
self.assert_(ch in list(p1._children.values()))
self.assert_(len(p1._children) == 1)
self.assert_(p1.children)
self.assert_(len(p1.children) == 1)
self.assert_(ch not in p1.children)
self.assert_("a" in p1.children)
self.assert_(p1.children["a"] == "regular")
self.assert_(p1._children["a"] == ch)
p1.children["b"] = "proxied"
eq_(list(p1.children.keys()), ["a", "b"])
eq_(list(p1.children.items()), [("a", "regular"), ("b", "proxied")])
eq_(list(p1.children.values()), ["regular", "proxied"])
self.assert_("proxied" in list(p1.children.values()))
self.assert_("b" in p1.children)
self.assert_("proxied" not in p1._children)
self.assert_(len(p1.children) == 2)
self.assert_(len(p1._children) == 2)
self.assert_(p1._children["a"].name == "regular")
self.assert_(p1._children["b"].name == "proxied")
del p1._children["b"]
self.assert_(len(p1._children) == 1)
self.assert_(len(p1.children) == 1)
self.assert_(p1._children["a"] == ch)
del p1.children["a"]
self.assert_(len(p1._children) == 0)
self.assert_(len(p1.children) == 0)
p1.children = {"d": "v d", "e": "v e", "f": "v f"}
self.assert_(len(p1._children) == 3)
self.assert_(len(p1.children) == 3)
self.assert_(set(p1.children) == {"d", "e", "f"})
del ch
p1 = self.roundtrip(p1)
self.assert_(len(p1._children) == 3)
self.assert_(len(p1.children) == 3)
p1.children["e"] = "changed-in-place"
self.assert_(p1.children["e"] == "changed-in-place")
inplace_id = p1._children["e"].id
p1 = self.roundtrip(p1)
self.assert_(p1.children["e"] == "changed-in-place")
self.assert_(p1._children["e"].id == inplace_id)
p1._children = {}
self.assert_(len(p1.children) == 0)
try:
p1._children = []
self.assert_(False)
except TypeError:
self.assert_(True)
try:
p1._children = None
self.assert_(False)
except TypeError:
self.assert_(True)
assert_raises(TypeError, set, [p1.children])
def test_bulk_replace(self):
Parent = self.classes.Parent
p1 = Parent("foo")
p1.children = {"a": "v a", "b": "v b", "c": "v c"}
assocs = set(p1._children.values())
keep_assocs = {a for a in assocs if a.foo in ("a", "c")}
eq_(len(keep_assocs), 2)
remove_assocs = {a for a in assocs if a.foo == "b"}
p1.children = {"a": "v a", "d": "v d", "c": "v c"}
eq_(
{a for a in p1._children.values() if a.foo in ("a", "c")},
keep_assocs,
)
assert not remove_assocs.intersection(p1._children.values())
eq_(p1.children, {"a": "v a", "d": "v d", "c": "v c"})
class SetTest(_CollectionOperations):
collection_class = set
def test_set_operations(self):
Parent, Child = self.classes.Parent, self.classes.Child
self.session = fixture_session()
p1 = Parent("P1")
self.assert_(not p1._children)
self.assert_(not p1.children)
ch1 = Child("regular")
p1._children.add(ch1)
self.assert_(ch1 in p1._children)
self.assert_(len(p1._children) == 1)
self.assert_(p1.children)
self.assert_(len(p1.children) == 1)
self.assert_(ch1 not in p1.children)
self.assert_("regular" in p1.children)
p1.children.add("proxied")
self.assert_("proxied" in p1.children)
self.assert_("proxied" not in p1._children)
self.assert_(len(p1.children) == 2)
self.assert_(len(p1._children) == 2)
self.assert_({o.name for o in p1._children} == {"regular", "proxied"})
ch2 = None
for o in p1._children:
if o.name == "proxied":
ch2 = o
break
p1._children.remove(ch2)
self.assert_(len(p1._children) == 1)
self.assert_(len(p1.children) == 1)
self.assert_(p1._children == {ch1})
p1.children.remove("regular")
self.assert_(len(p1._children) == 0)
self.assert_(len(p1.children) == 0)
p1.children = ["a", "b", "c"]
self.assert_(len(p1._children) == 3)
self.assert_(len(p1.children) == 3)
del ch1
p1 = self.roundtrip(p1)
self.assert_(len(p1._children) == 3)
self.assert_(len(p1.children) == 3)
self.assert_("a" in p1.children)
self.assert_("b" in p1.children)
self.assert_("d" not in p1.children)
self.assert_(p1.children == {"a", "b", "c"})
assert_raises(KeyError, p1.children.remove, "d")
self.assert_(len(p1.children) == 3)
p1.children.discard("d")
self.assert_(len(p1.children) == 3)
p1 = self.roundtrip(p1)
self.assert_(len(p1.children) == 3)
popped = p1.children.pop()
self.assert_(len(p1.children) == 2)
self.assert_(popped not in p1.children)
p1 = self.roundtrip(p1)
self.assert_(len(p1.children) == 2)
self.assert_(popped not in p1.children)
p1.children = ["a", "b", "c"]
p1 = self.roundtrip(p1)
self.assert_(p1.children == {"a", "b", "c"})
p1.children.discard("b")
p1 = self.roundtrip(p1)
self.assert_(p1.children == {"a", "c"})
p1.children.remove("a")
p1 = self.roundtrip(p1)
self.assert_(p1.children == {"c"})
p1._children = set()
self.assert_(len(p1.children) == 0)
try:
p1._children = []
self.assert_(False)
except TypeError:
self.assert_(True)
try:
p1._children = None
self.assert_(False)
except TypeError:
self.assert_(True)
assert_raises(TypeError, set, [p1.children])
def test_special_binops_checks(self):
"""test for #11349"""
Parent = self.classes.Parent
p1 = Parent("P1")
p1.children = ["a", "b", "c"]
control = {"a", "b", "c"}
with expect_raises(TypeError):
control | ["c", "d"]
with expect_raises(TypeError):
p1.children | ["c", "d"]
with expect_raises(TypeError):
control |= ["c", "d"]
with expect_raises(TypeError):
p1.children |= ["c", "d"]
with expect_raises(TypeError):
control & ["c", "d"]
with expect_raises(TypeError):
p1.children & ["c", "d"]
with expect_raises(TypeError):
control &= ["c", "d"]
with expect_raises(TypeError):
p1.children &= ["c", "d"]
with expect_raises(TypeError):
control ^ ["c", "d"]
with expect_raises(TypeError):
p1.children ^ ["c", "d"]
with expect_raises(TypeError):
control ^= ["c", "d"]
with expect_raises(TypeError):
p1.children ^= ["c", "d"]
with expect_raises(TypeError):
control - ["c", "d"]
with expect_raises(TypeError):
p1.children - ["c", "d"]
with expect_raises(TypeError):
control -= ["c", "d"]
with expect_raises(TypeError):
p1.children -= ["c", "d"]
def test_set_comparisons(self):
Parent = self.classes.Parent
p1 = Parent("P1")
p1.children = ["a", "b", "c"]
control = {"a", "b", "c"}
for other in (
{"a", "b", "c"},
{"a", "b", "c", "d"},
{"a"},
{"a", "b"},
{"c", "d"},
{"e", "f", "g"},
set(),
):
eq_(p1.children.union(other), control.union(other))
eq_(p1.children.difference(other), control.difference(other))
eq_((p1.children - other), (control - other))
eq_(p1.children.intersection(other), control.intersection(other))
eq_(
p1.children.symmetric_difference(other),
control.symmetric_difference(other),
)
eq_(p1.children.issubset(other), control.issubset(other))
eq_(p1.children.issuperset(other), control.issuperset(other))
self.assert_((p1.children == other) == (control == other))
self.assert_((p1.children != other) == (control != other))
self.assert_((p1.children < other) == (control < other))
self.assert_((p1.children <= other) == (control <= other))
self.assert_((p1.children > other) == (control > other))
self.assert_((p1.children >= other) == (control >= other))
def test_set_comparison_empty_to_empty(self):
# test issue #3265 which was fixed in Python version 2.7.8
Parent = self.classes.Parent
p1 = Parent("P1")
p1.children = []
p2 = Parent("P2")
p2.children = []
set_0 = set()
set_a = p1.children
set_b = p2.children
is_(set_a == set_a, True)
is_(set_a == set_b, True)
is_(set_a == set_0, True)
is_(set_0 == set_a, True)
is_(set_a != set_a, False)
is_(set_a != set_b, False)
is_(set_a != set_0, False)
is_(set_0 != set_a, False)
def test_set_mutation(self):
Parent = self.classes.Parent
self.session = fixture_session()
# mutations
for op in (
"update",
"intersection_update",
"difference_update",
"symmetric_difference_update",
):
for base in (["a", "b", "c"], []):
for other in (
{"a", "b", "c"},
{"a", "b", "c", "d"},
{"a"},
{"a", "b"},
{"c", "d"},
{"e", "f", "g"},
set(),
):
p = Parent("p")
p.children = base[:]
control = set(base[:])
getattr(p.children, op)(other)
getattr(control, op)(other)
try:
self.assert_(p.children == control)
except Exception:
print("Test %s.%s(%s):" % (set(base), op, other))
print("want", repr(control))
print("got", repr(p.children))
raise
p = self.roundtrip(p)
try:
self.assert_(p.children == control)
except Exception:
print("Test %s.%s(%s):" % (base, op, other))
print("want", repr(control))
print("got", repr(p.children))
raise
# in-place mutations
for op in ("|=", "-=", "&=", "^="):
for base in (["a", "b", "c"], []):
for other in (
{"a", "b", "c"},
{"a", "b", "c", "d"},
{"a"},
{"a", "b"},
{"c", "d"},
{"e", "f", "g"},
frozenset(["e", "f", "g"]),
set(),
):
p = Parent("p")
p.children = base[:]
control = set(base[:])
exec("p.children %s other" % op)
exec("control %s other" % op)
try:
self.assert_(p.children == control)
except Exception:
print("Test %s %s %s:" % (set(base), op, other))
print("want", repr(control))
print("got", repr(p.children))
raise
p = self.roundtrip(p)
try:
self.assert_(p.children == control)
except Exception:
print("Test %s %s %s:" % (base, op, other))
print("want", repr(control))
print("got", repr(p.children))
raise
def test_bulk_replace(self):
Parent = self.classes.Parent
p1 = Parent("foo")
p1.children = {"a", "b", "c"}
assocs = set(p1._children)
keep_assocs = {a for a in assocs if a.name in ("a", "c")}
eq_(len(keep_assocs), 2)
remove_assocs = {a for a in assocs if a.name == "b"}
p1.children = {"a", "c", "d"}
eq_({a for a in p1._children if a.name in ("a", "c")}, keep_assocs)
assert not remove_assocs.intersection(p1._children)
eq_(p1.children, {"a", "c", "d"})
class CustomSetTest(SetTest):
collection_class = SetCollection
class CustomObjectTest(_CollectionOperations):
collection_class = ObjectCollection
def test_basic(self):
Parent = self.classes.Parent
self.session = fixture_session()
p = Parent("p1")
self.assert_(len(list(p.children)) == 0)
p.children.append("child")
self.assert_(len(list(p.children)) == 1)
p = self.roundtrip(p)
self.assert_(len(list(p.children)) == 1)
# We didn't provide an alternate _AssociationList implementation
# for our ObjectCollection, so indexing will fail.
assert_raises(TypeError, p.children.__getitem__, 1)
class ProxyFactoryTest(ListTest):
@classmethod
def define_tables(cls, metadata):
Table(
"Parent",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("name", String(128)),
)
Table(
"Children",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("parent_id", Integer, ForeignKey("Parent.id")),
Column("foo", String(128)),
Column("name", String(128)),
)
@classmethod
def setup_mappers(cls):
parents_table, children_table = cls.tables("Parent", "Children")
class CustomProxy(_AssociationList):
def __init__(self, lazy_collection, creator, value_attr, parent):
getter, setter = parent._default_getset(lazy_collection)
_AssociationList.__init__(
self, lazy_collection, creator, getter, setter, parent
)
class Parent(cls.Basic):
children = association_proxy(
"_children",
"name",
proxy_factory=CustomProxy,
proxy_bulk_set=CustomProxy.extend,
)
def __init__(self, name):
self.name = name
class Child(cls.Basic):
def __init__(self, name):
self.name = name
cls.mapper_registry.map_imperatively(
Parent,
parents_table,
properties={
"_children": relationship(
Child, lazy="joined", collection_class=list
)
},
)
cls.mapper_registry.map_imperatively(Child, children_table)
def test_sequence_ops(self):
self._test_sequence_ops()
class ScalarTest(fixtures.MappedTest):
@testing.provide_metadata
def test_scalar_proxy(self):
metadata = self.metadata
parents_table = Table(
"Parent",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("name", String(128)),
)
children_table = Table(
"Children",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("parent_id", Integer, ForeignKey("Parent.id")),
Column("foo", String(128)),
Column("bar", String(128)),
Column("baz", String(128)),
)
class Parent:
foo = association_proxy("child", "foo")
bar = association_proxy(
"child", "bar", creator=lambda v: Child(bar=v)
)
baz = association_proxy(
"child", "baz", creator=lambda v: Child(baz=v)
)
def __init__(self, name):
self.name = name
class Child:
def __init__(self, **kw):
for attr in kw:
setattr(self, attr, kw[attr])
self.mapper_registry.map_imperatively(
Parent,
parents_table,
properties={
"child": relationship(
Child, lazy="joined", backref="parent", uselist=False
)
},
)
self.mapper_registry.map_imperatively(Child, children_table)
metadata.create_all(testing.db)
session = fixture_session()
def roundtrip(obj):
if obj not in session:
session.add(obj)
session.flush()
id_, type_ = obj.id, type(obj)
session.expunge_all()
return session.get(type_, id_)
p = Parent("p")
eq_(p.child, None)
eq_(p.foo, None)
p.child = Child(foo="a", bar="b", baz="c")
self.assert_(p.foo == "a")
self.assert_(p.bar == "b")
self.assert_(p.baz == "c")
p.bar = "x"
self.assert_(p.foo == "a")
self.assert_(p.bar == "x")
self.assert_(p.baz == "c")
p = roundtrip(p)
self.assert_(p.foo == "a")
self.assert_(p.bar == "x")
self.assert_(p.baz == "c")
p.child = None
eq_(p.foo, None)
# Bogus creator for this scalar type
assert_raises(TypeError, setattr, p, "foo", "zzz")
p.bar = "yyy"
self.assert_(p.foo is None)
self.assert_(p.bar == "yyy")
self.assert_(p.baz is None)
del p.child
p = roundtrip(p)
self.assert_(p.child is None)
p.baz = "xxx"
self.assert_(p.foo is None)
self.assert_(p.bar is None)
self.assert_(p.baz == "xxx")
p = roundtrip(p)
self.assert_(p.foo is None)
self.assert_(p.bar is None)
self.assert_(p.baz == "xxx")
# Ensure an immediate __set__ works.
p2 = Parent("p2")
p2.bar = "quux"
def test_scalar_opts_exclusive(self):
with expect_raises_message(
exc.ArgumentError,
"The cascade_scalar_deletes and create_on_none_assignment "
"parameters are mutually exclusive.",
):
association_proxy(
"a",
"b",
cascade_scalar_deletes=True,
create_on_none_assignment=True,
)
@testing.variation("create_on_none", [True, False])
@testing.variation("specify_creator", [True, False])
def test_create_on_set_none(
self, create_on_none, specify_creator, decl_base
):
class A(decl_base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
b = relationship("B")
if specify_creator:
b_data = association_proxy(
"b",
"data",
create_on_none_assignment=bool(create_on_none),
creator=lambda data: B(data=data),
)
else:
b_data = association_proxy(
"b", "data", create_on_none_assignment=bool(create_on_none)
)
class B(decl_base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
data = mapped_column(String)
def __init__(self, data=None):
self.data = data
a1 = A()
is_none(a1.b)
a1.b_data = None
if create_on_none:
is_not_none(a1.b)
else:
is_none(a1.b)
a1.b_data = "data"
a1.b_data = None
is_not_none(a1.b)
@testing.provide_metadata
def test_empty_scalars(self):
metadata = self.metadata
a = Table(
"a",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50)),
)
a2b = Table(
"a2b",
metadata,
Column("id", Integer, primary_key=True),
Column("id_a", Integer, ForeignKey("a.id")),
Column("id_b", Integer, ForeignKey("b.id")),
Column("name", String(50)),
)
b = Table(
"b",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50)),
)
class A:
a2b_name = association_proxy("a2b_single", "name")
b_single = association_proxy("a2b_single", "b")
class A2B:
pass
class B:
pass
self.mapper_registry.map_imperatively(
A, a, properties=dict(a2b_single=relationship(A2B, uselist=False))
)
self.mapper_registry.map_imperatively(
A2B, a2b, properties=dict(b=relationship(B))
)
self.mapper_registry.map_imperatively(B, b)
a1 = A()
assert a1.a2b_name is None
assert a1.b_single is None
def test_custom_getset(self):
metadata = MetaData()
p = Table(
"p",
metadata,
Column("id", Integer, primary_key=True),
Column("cid", Integer, ForeignKey("c.id")),
)
c = Table(
"c",
metadata,
Column("id", Integer, primary_key=True),
Column("foo", String(128)),
)
get = Mock()
set_ = Mock()
class Parent:
foo = association_proxy(
"child", "foo", getset_factory=lambda cc, parent: (get, set_)
)
class Child:
def __init__(self, foo):
self.foo = foo
self.mapper_registry.map_imperatively(
Parent, p, properties={"child": relationship(Child)}
)
self.mapper_registry.map_imperatively(Child, c)
p1 = Parent()
eq_(p1.foo, get(None))
p1.child = child = Child(foo="x")
eq_(p1.foo, get(child))
p1.foo = "y"
eq_(set_.mock_calls, [call(child, "y")])
class LazyLoadTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table(
"Parent",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("name", String(128)),
)
Table(
"Children",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("parent_id", Integer, ForeignKey("Parent.id")),
Column("foo", String(128)),
Column("name", String(128)),
)
@classmethod
def setup_mappers(cls):
class Parent(cls.Basic):
children = association_proxy("_children", "name")
def __init__(self, name):
self.name = name
class Child(cls.Basic):
def __init__(self, name):
self.name = name
cls.mapper_registry.map_imperatively(Child, cls.tables.Children)
def roundtrip(self, obj):
self.session.add(obj)
self.session.flush()
id_, type_ = obj.id, type(obj)
self.session.expunge_all()
return self.session.get(type_, id_)
def test_lazy_list(self):
Parent, Child = self.classes("Parent", "Child")
self.session = fixture_session()
self.mapper_registry.map_imperatively(
Parent,
self.tables.Parent,
properties={
"_children": relationship(
Child, lazy="select", collection_class=list
)
},
)
p = Parent("p")
p.children = ["a", "b", "c"]
p = self.roundtrip(p)
# Is there a better way to ensure that the association_proxy
# didn't convert a lazy load to an eager load? This does work though.
self.assert_("_children" not in p.__dict__)
self.assert_(len(p._children) == 3)
self.assert_("_children" in p.__dict__)
def test_eager_list(self):
Parent, Child = self.classes("Parent", "Child")
self.session = fixture_session()
self.mapper_registry.map_imperatively(
Parent,
self.tables.Parent,
properties={
"_children": relationship(
Child, lazy="joined", collection_class=list
)
},
)
p = Parent("p")
p.children = ["a", "b", "c"]
p = self.roundtrip(p)
self.assert_("_children" in p.__dict__)
self.assert_(len(p._children) == 3)
def test_slicing_list(self):
Parent, Child = self.classes("Parent", "Child")
self.session = fixture_session()
self.mapper_registry.map_imperatively(
Parent,
self.tables.Parent,
properties={
"_children": relationship(
Child, lazy="select", collection_class=list
)
},
)
p = Parent("p")
p.children = ["a", "b", "c"]
p = self.roundtrip(p)
self.assert_(len(p._children) == 3)
eq_("b", p.children[1])
eq_(["b", "c"], p.children[-2:])
def test_lazy_scalar(self):
Parent, Child = self.classes("Parent", "Child")
self.session = fixture_session()
self.mapper_registry.map_imperatively(
Parent,
self.tables.Parent,
properties={
"_children": relationship(Child, lazy="select", uselist=False)
},
)
p = Parent("p")
p.children = "value"
p = self.roundtrip(p)
self.assert_("_children" not in p.__dict__)
self.assert_(p._children is not None)
def test_eager_scalar(self):
Parent, Child = self.classes("Parent", "Child")
self.session = fixture_session()
self.mapper_registry.map_imperatively(
Parent,
self.tables.Parent,
properties={
"_children": relationship(Child, lazy="joined", uselist=False)
},
)
p = Parent("p")
p.children = "value"
p = self.roundtrip(p)
self.assert_("_children" in p.__dict__)
self.assert_(p._children is not None)
class Parent:
def __init__(self, name):
self.name = name
class Child:
def __init__(self, name):
self.name = name
class KVChild:
def __init__(self, name, value):
self.name = name
self.value = value
class ReconstitutionTest(fixtures.MappedTest):
run_setup_mappers = "each"
run_setup_classes = "each"
@classmethod
def define_tables(cls, metadata):
Table(
"parents",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("name", String(30)),
)
Table(
"children",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("parent_id", Integer, ForeignKey("parents.id")),
Column("name", String(30)),
)
@classmethod
def insert_data(cls, connection):
parents = cls.tables.parents
connection.execute(parents.insert(), dict(name="p1"))
@classmethod
def setup_classes(cls):
Parent.kids = association_proxy("children", "name")
def test_weak_identity_map(self):
self.mapper_registry.map_imperatively(
Parent,
self.tables.parents,
properties=dict(children=relationship(Child)),
)
self.mapper_registry.map_imperatively(Child, self.tables.children)
session = fixture_session()
def add_child(parent_name, child_name):
parent = session.query(Parent).filter_by(name=parent_name).one()
parent.kids.append(child_name)
add_child("p1", "c1")
gc_collect()
add_child("p1", "c2")
session.flush()
p = session.query(Parent).filter_by(name="p1").one()
assert set(p.kids) == {"c1", "c2"}, p.kids
def test_copy(self):
self.mapper_registry.map_imperatively(
Parent,
self.tables.parents,
properties=dict(children=relationship(Child)),
)
self.mapper_registry.map_imperatively(Child, self.tables.children)
p = Parent("p1")
p.kids.extend(["c1", "c2"])
p_copy = copy.copy(p)
del p
gc_collect()
assert set(p_copy.kids) == {"c1", "c2"}, p_copy.kids
def test_pickle_list(self):
self.mapper_registry.map_imperatively(
Parent,
self.tables.parents,
properties=dict(children=relationship(Child)),
)
self.mapper_registry.map_imperatively(Child, self.tables.children)
p = Parent("p1")
p.kids.extend(["c1", "c2"])
r1 = pickle.loads(pickle.dumps(p))
assert r1.kids == ["c1", "c2"]
# can't do this without parent having a cycle
# r2 = pickle.loads(pickle.dumps(p.kids))
# assert r2 == ['c1', 'c2']
def test_pickle_set(self):
self.mapper_registry.map_imperatively(
Parent,
self.tables.parents,
properties=dict(
children=relationship(Child, collection_class=set)
),
)
self.mapper_registry.map_imperatively(Child, self.tables.children)
p = Parent("p1")
p.kids.update(["c1", "c2"])
r1 = pickle.loads(pickle.dumps(p))
assert r1.kids == {"c1", "c2"}
# can't do this without parent having a cycle
# r2 = pickle.loads(pickle.dumps(p.kids))
# assert r2 == set(['c1', 'c2'])
def test_pickle_dict(self):
self.mapper_registry.map_imperatively(
Parent,
self.tables.parents,
properties=dict(
children=relationship(
KVChild,
collection_class=collections.keyfunc_mapping(
PickleKeyFunc("name")
),
)
),
)
self.mapper_registry.map_imperatively(KVChild, self.tables.children)
p = Parent("p1")
p.kids.update({"c1": "v1", "c2": "v2"})
assert p.kids == {"c1": "c1", "c2": "c2"}
r1 = pickle.loads(pickle.dumps(p))
assert r1.kids == {"c1": "c1", "c2": "c2"}
# can't do this without parent having a cycle
# r2 = pickle.loads(pickle.dumps(p.kids))
# assert r2 == {'c1': 'c1', 'c2': 'c2'}
class PickleKeyFunc:
def __init__(self, name):
self.name = name
def __call__(self, obj):
return getattr(obj, self.name)
class ComparatorTest(fixtures.MappedTest, AssertsCompiledSQL):
__dialect__ = "default"
run_inserts = "once"
run_deletes = None
run_setup_mappers = "once"
run_setup_classes = "once"
@classmethod
def define_tables(cls, metadata):
Table(
"userkeywords",
metadata,
Column(
"keyword_id",
Integer,
ForeignKey("keywords.id"),
primary_key=True,
),
Column("user_id", Integer, ForeignKey("users.id")),
Column("value", String(50)),
)
Table(
"users",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("name", String(64)),
Column("singular_id", Integer, ForeignKey("singular.id")),
)
Table(
"keywords",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("keyword", String(64)),
Column("singular_id", Integer, ForeignKey("singular.id")),
)
Table(
"singular",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("value", String(50)),
)
@classmethod
def setup_classes(cls):
class User(cls.Comparable):
def __init__(self, name):
self.name = name
# o2m -> m2o
# uselist -> nonuselist
keywords = association_proxy(
"user_keywords",
"keyword",
creator=lambda k: UserKeyword(keyword=k),
)
# m2o -> o2m
# nonuselist -> uselist
singular_keywords = association_proxy("singular", "keywords")
# m2o -> scalar
# nonuselist
singular_value = association_proxy("singular", "value")
# o2m -> scalar
singular_collection = association_proxy("user_keywords", "value")
# uselist assoc_proxy -> assoc_proxy -> obj
common_users = association_proxy("user_keywords", "common_users")
# non uselist assoc_proxy -> assoc_proxy -> obj
common_singular = association_proxy("singular", "keyword")
# non uselist assoc_proxy -> assoc_proxy -> scalar
singular_keyword = association_proxy("singular", "keyword")
# uselist assoc_proxy -> assoc_proxy -> scalar
common_keyword_name = association_proxy(
"user_keywords", "keyword_name"
)
class Keyword(cls.Comparable):
def __init__(self, keyword):
self.keyword = keyword
# o2o -> m2o
# nonuselist -> nonuselist
user = association_proxy("user_keyword", "user")
# uselist assoc_proxy -> collection -> assoc_proxy -> scalar object
# (o2m relationship,
# associationproxy(m2o relationship, m2o relationship))
singulars = association_proxy("user_keywords", "singular")
class UserKeyword(cls.Comparable):
def __init__(self, user=None, keyword=None):
self.user = user
self.keyword = keyword
common_users = association_proxy("keyword", "user")
keyword_name = association_proxy("keyword", "keyword")
singular = association_proxy("user", "singular")
class Singular(cls.Comparable):
def __init__(self, value=None):
self.value = value
keyword = association_proxy("keywords", "keyword")
@classmethod
def setup_mappers(cls):
(
users,
Keyword,
UserKeyword,
singular,
userkeywords,
User,
keywords,
Singular,
) = (
cls.tables.users,
cls.classes.Keyword,
cls.classes.UserKeyword,
cls.tables.singular,
cls.tables.userkeywords,
cls.classes.User,
cls.tables.keywords,
cls.classes.Singular,
)
cls.mapper_registry.map_imperatively(
User, users, properties={"singular": relationship(Singular)}
)
cls.mapper_registry.map_imperatively(
Keyword,
keywords,
properties={
"user_keyword": relationship(
UserKeyword, uselist=False, back_populates="keyword"
),
"user_keywords": relationship(UserKeyword, viewonly=True),
},
)
cls.mapper_registry.map_imperatively(
UserKeyword,
userkeywords,
properties={
"user": relationship(User, backref="user_keywords"),
"keyword": relationship(
Keyword, back_populates="user_keyword"
),
},
)
cls.mapper_registry.map_imperatively(
Singular, singular, properties={"keywords": relationship(Keyword)}
)
@classmethod
def insert_data(cls, connection):
UserKeyword, User, Keyword, Singular = (
cls.classes.UserKeyword,
cls.classes.User,
cls.classes.Keyword,
cls.classes.Singular,
)
session = Session(connection)
words = ("quick", "brown", "fox", "jumped", "over", "the", "lazy")
for ii in range(16):
user = User("user%d" % ii)
if ii % 2 == 0:
user.singular = Singular(
value=("singular%d" % ii) if ii % 4 == 0 else None
)
session.add(user)
for jj in words[(ii % len(words)) : ((ii + 3) % len(words))]:
k = Keyword(jj)
user.keywords.append(k)
if ii % 2 == 0:
user.singular.keywords.append(k)
user.user_keywords[-1].value = "singular%d" % ii
orphan = Keyword("orphan")
orphan.user_keyword = UserKeyword(keyword=orphan, user=None)
session.add(orphan)
keyword_with_nothing = Keyword("kwnothing")
session.add(keyword_with_nothing)
session.commit()
cls.u = user
cls.kw = user.keywords[0]
# TODO: this is not the correct pattern, use session per test
cls.session = Session(testing.db)
def _query_equivalent(self, q_proxy, q_direct):
self._equivalent(q_proxy.statement, q_direct.statement)
eq_(q_proxy.all(), q_direct.all())
def _equivalent(self, q_proxy, q_direct):
proxy_sql = q_proxy.compile(dialect=default.DefaultDialect())
direct_sql = q_direct.compile(dialect=default.DefaultDialect())
eq_(str(proxy_sql), str(direct_sql))
def _statement_equivalent(self, session, q_proxy, q_direct):
self._equivalent(q_proxy, q_direct)
eq_(session.scalars(q_proxy).all(), session.scalars(q_direct).all())
def test_no_straight_expr(self):
User = self.classes.User
assert_raises_message(
NotImplementedError,
"The association proxy can't be used as a plain column expression",
func.foo,
User.singular_value,
)
assert_raises_message(
NotImplementedError,
"The association proxy can't be used as a plain column expression",
self.session.query,
User.singular_value,
)
def test_filter_any_criterion_ul_scalar(self):
UserKeyword, User = self.classes.UserKeyword, self.classes.User
q1 = self.session.query(User).filter(
User.singular_collection.any(UserKeyword.value == "singular8")
)
self.assert_compile(
q1,
"SELECT users.id AS users_id, users.name AS users_name, "
"users.singular_id AS users_singular_id "
"FROM users "
"WHERE EXISTS (SELECT 1 "
"FROM userkeywords "
"WHERE users.id = userkeywords.user_id AND "
"userkeywords.value = :value_1)",
checkparams={"value_1": "singular8"},
)
q2 = self.session.query(User).filter(
User.user_keywords.any(UserKeyword.value == "singular8")
)
self._query_equivalent(q1, q2)
def test_filter_any_kwarg_ul_nul(self):
UserKeyword, User = self.classes.UserKeyword, self.classes.User
self._query_equivalent(
self.session.query(User).filter(
User.keywords.any(keyword="jumped")
),
self.session.query(User).filter(
User.user_keywords.any(
UserKeyword.keyword.has(keyword="jumped")
)
),
)
def test_filter_has_kwarg_nul_nul(self):
UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword
self._query_equivalent(
self.session.query(Keyword).filter(Keyword.user.has(name="user2")),
self.session.query(Keyword).filter(
Keyword.user_keyword.has(UserKeyword.user.has(name="user2"))
),
)
def test_filter_has_kwarg_nul_ul(self):
User, Singular = self.classes.User, self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(
User.singular_keywords.any(keyword="jumped")
),
self.session.query(User).filter(
User.singular.has(Singular.keywords.any(keyword="jumped"))
),
)
def test_filter_any_criterion_ul_nul(self):
UserKeyword, User, Keyword = (
self.classes.UserKeyword,
self.classes.User,
self.classes.Keyword,
)
self._query_equivalent(
self.session.query(User).filter(
User.keywords.any(Keyword.keyword == "jumped")
),
self.session.query(User).filter(
User.user_keywords.any(
UserKeyword.keyword.has(Keyword.keyword == "jumped")
)
),
)
def test_filter_has_criterion_nul_nul(self):
UserKeyword, User, Keyword = (
self.classes.UserKeyword,
self.classes.User,
self.classes.Keyword,
)
self._query_equivalent(
self.session.query(Keyword).filter(
Keyword.user.has(User.name == "user2")
),
self.session.query(Keyword).filter(
Keyword.user_keyword.has(
UserKeyword.user.has(User.name == "user2")
)
),
)
def test_filter_any_criterion_nul_ul(self):
User, Keyword, Singular = (
self.classes.User,
self.classes.Keyword,
self.classes.Singular,
)
self._query_equivalent(
self.session.query(User).filter(
User.singular_keywords.any(Keyword.keyword == "jumped")
),
self.session.query(User).filter(
User.singular.has(
Singular.keywords.any(Keyword.keyword == "jumped")
)
),
)
def test_filter_contains_ul_nul(self):
User = self.classes.User
self._query_equivalent(
self.session.query(User).filter(User.keywords.contains(self.kw)),
self.session.query(User).filter(
User.user_keywords.any(keyword=self.kw)
),
)
def test_filter_contains_nul_ul(self):
User, Singular = self.classes.User, self.classes.Singular
with expect_warnings(
"Got None for value of column keywords.singular_id;"
):
self._query_equivalent(
self.session.query(User).filter(
User.singular_keywords.contains(self.kw)
),
self.session.query(User).filter(
User.singular.has(Singular.keywords.contains(self.kw))
),
)
def test_filter_eq_nul_nul(self):
Keyword = self.classes.Keyword
self._query_equivalent(
self.session.query(Keyword).filter(Keyword.user == self.u),
self.session.query(Keyword).filter(
Keyword.user_keyword.has(user=self.u)
),
)
def test_filter_ne_nul_nul(self):
Keyword = self.classes.Keyword
UserKeyword = self.classes.UserKeyword
self._query_equivalent(
self.session.query(Keyword).filter(Keyword.user != self.u),
self.session.query(Keyword).filter(
Keyword.user_keyword.has(UserKeyword.user != self.u)
),
)
def test_filter_eq_null_nul_nul(self):
UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword
self._query_equivalent(
self.session.query(Keyword).filter(Keyword.user == None), # noqa
self.session.query(Keyword).filter(
or_(
Keyword.user_keyword.has(UserKeyword.user == None),
Keyword.user_keyword == None,
)
),
)
def test_filter_ne_null_nul_nul(self):
UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword
self._query_equivalent(
self.session.query(Keyword).filter(Keyword.user != None), # noqa
self.session.query(Keyword).filter(
Keyword.user_keyword.has(UserKeyword.user != None)
),
)
def test_filter_object_eq_None_nul(self):
UserKeyword = self.classes.UserKeyword
User = self.classes.User
self._query_equivalent(
self.session.query(UserKeyword).filter(
UserKeyword.singular == None
), # noqa
self.session.query(UserKeyword).filter(
or_(
UserKeyword.user.has(User.singular == None),
UserKeyword.user_id == None,
)
),
)
def test_filter_column_eq_None_nul(self):
User = self.classes.User
Singular = self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(
User.singular_value == None
), # noqa
self.session.query(User).filter(
or_(
User.singular.has(Singular.value == None),
User.singular == None,
)
),
)
def test_filter_object_ne_value_nul(self):
UserKeyword = self.classes.UserKeyword
User = self.classes.User
Singular = self.classes.Singular
s4 = self.session.query(Singular).filter_by(value="singular4").one()
self._query_equivalent(
self.session.query(UserKeyword).filter(UserKeyword.singular != s4),
self.session.query(UserKeyword).filter(
UserKeyword.user.has(User.singular != s4)
),
)
def test_filter_column_ne_value_nul(self):
User = self.classes.User
Singular = self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(
User.singular_value != "singular4"
),
self.session.query(User).filter(
User.singular.has(Singular.value != "singular4")
),
)
def test_filter_eq_value_nul(self):
User = self.classes.User
Singular = self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(
User.singular_value == "singular4"
),
self.session.query(User).filter(
User.singular.has(Singular.value == "singular4")
),
)
def test_filter_ne_None_nul(self):
User = self.classes.User
Singular = self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(
User.singular_value != None
), # noqa
self.session.query(User).filter(
User.singular.has(Singular.value != None)
),
)
def test_has_nul(self):
# a special case where we provide an empty has() on a
# non-object-targeted association proxy.
User = self.classes.User
self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(User.singular_value.has()),
self.session.query(User).filter(User.singular.has()),
)
def test_nothas_nul(self):
# a special case where we provide an empty has() on a
# non-object-targeted association proxy.
User = self.classes.User
self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(~User.singular_value.has()),
self.session.query(User).filter(~User.singular.has()),
)
def test_filter_any_chained(self):
User = self.classes.User
UserKeyword, User = self.classes.UserKeyword, self.classes.User
Keyword = self.classes.Keyword
q1 = self.session.query(User).filter(
User.common_users.any(User.name == "user7")
)
self.assert_compile(
q1,
"SELECT users.id AS users_id, users.name AS users_name, "
"users.singular_id AS users_singular_id "
"FROM users "
"WHERE EXISTS (SELECT 1 "
"FROM userkeywords "
"WHERE users.id = userkeywords.user_id AND (EXISTS (SELECT 1 "
"FROM keywords "
"WHERE keywords.id = userkeywords.keyword_id AND "
"(EXISTS (SELECT 1 "
"FROM userkeywords "
"WHERE keywords.id = userkeywords.keyword_id AND "
"(EXISTS (SELECT 1 "
"FROM users "
"WHERE users.id = userkeywords.user_id AND users.name = :name_1)"
"))))))",
checkparams={"name_1": "user7"},
)
q2 = self.session.query(User).filter(
User.user_keywords.any(
UserKeyword.keyword.has(
Keyword.user_keyword.has(
UserKeyword.user.has(User.name == "user7")
)
)
)
)
self._query_equivalent(q1, q2)
def test_filter_has_chained_has_to_any(self):
User = self.classes.User
Singular = self.classes.Singular
Keyword = self.classes.Keyword
q1 = self.session.query(User).filter(
User.common_singular.has(Keyword.keyword == "brown")
)
self.assert_compile(
q1,
"SELECT users.id AS users_id, users.name AS users_name, "
"users.singular_id AS users_singular_id "
"FROM users "
"WHERE EXISTS (SELECT 1 "
"FROM singular "
"WHERE singular.id = users.singular_id AND (EXISTS (SELECT 1 "
"FROM keywords "
"WHERE singular.id = keywords.singular_id AND "
"keywords.keyword = :keyword_1)))",
checkparams={"keyword_1": "brown"},
)
q2 = self.session.query(User).filter(
User.singular.has(
Singular.keywords.any(Keyword.keyword == "brown")
)
)
self._query_equivalent(q1, q2)
def test_filter_has_scalar_raises(self):
User = self.classes.User
assert_raises_message(
exc.ArgumentError,
r"Can't apply keyword arguments to column-targeted",
User.singular_keyword.has,
keyword="brown",
)
def test_filter_eq_chained_has_to_any(self):
User = self.classes.User
Keyword = self.classes.Keyword
Singular = self.classes.Singular
q1 = self.session.query(User).filter(User.singular_keyword == "brown")
self.assert_compile(
q1,
"SELECT users.id AS users_id, users.name AS users_name, "
"users.singular_id AS users_singular_id "
"FROM users "
"WHERE EXISTS (SELECT 1 "
"FROM singular "
"WHERE singular.id = users.singular_id AND (EXISTS (SELECT 1 "
"FROM keywords "
"WHERE singular.id = keywords.singular_id "
"AND keywords.keyword = :keyword_1)))",
checkparams={"keyword_1": "brown"},
)
q2 = self.session.query(User).filter(
User.singular.has(
Singular.keywords.any(Keyword.keyword == "brown")
)
)
self._query_equivalent(q1, q2)
def test_filter_contains_chained_any_to_has(self):
User = self.classes.User
Keyword = self.classes.Keyword
UserKeyword = self.classes.UserKeyword
q1 = self.session.query(User).filter(
User.common_keyword_name.contains("brown")
)
self.assert_compile(
q1,
"SELECT users.id AS users_id, users.name AS users_name, "
"users.singular_id AS users_singular_id "
"FROM users "
"WHERE EXISTS (SELECT 1 "
"FROM userkeywords "
"WHERE users.id = userkeywords.user_id AND (EXISTS (SELECT 1 "
"FROM keywords "
"WHERE keywords.id = userkeywords.keyword_id AND "
"keywords.keyword = :keyword_1)))",
checkparams={"keyword_1": "brown"},
)
q2 = self.session.query(User).filter(
User.user_keywords.any(
UserKeyword.keyword.has(Keyword.keyword == "brown")
)
)
self._query_equivalent(q1, q2)
def test_filter_contains_chained_any_to_has_to_eq(self):
User = self.classes.User
Keyword = self.classes.Keyword
UserKeyword = self.classes.UserKeyword
Singular = self.classes.Singular
singular = self.session.query(Singular).order_by(Singular.id).first()
q1 = self.session.query(Keyword).filter(
Keyword.singulars.contains(singular)
)
self.assert_compile(
q1,
"SELECT keywords.id AS keywords_id, "
"keywords.keyword AS keywords_keyword, "
"keywords.singular_id AS keywords_singular_id "
"FROM keywords "
"WHERE EXISTS (SELECT 1 "
"FROM userkeywords "
"WHERE keywords.id = userkeywords.keyword_id AND "
"(EXISTS (SELECT 1 "
"FROM users "
"WHERE users.id = userkeywords.user_id AND "
":param_1 = users.singular_id)))",
checkparams={"param_1": singular.id},
)
q2 = self.session.query(Keyword).filter(
Keyword.user_keywords.any(
UserKeyword.user.has(User.singular == singular)
)
)
self._query_equivalent(q1, q2)
def test_has_criterion_nul(self):
# but we don't allow that with any criterion...
User = self.classes.User
self.classes.Singular
assert_raises_message(
exc.ArgumentError,
r"Non-empty has\(\) not allowed",
User.singular_value.has,
User.singular_value == "singular4",
)
def test_has_kwargs_nul(self):
# ... or kwargs
User = self.classes.User
self.classes.Singular
assert_raises_message(
exc.ArgumentError,
r"Can't apply keyword arguments to column-targeted",
User.singular_value.has,
singular_value="singular4",
)
def test_filter_scalar_object_contains_fails_nul_nul(self):
Keyword = self.classes.Keyword
assert_raises(
exc.InvalidRequestError, lambda: Keyword.user.contains(self.u)
)
def test_filter_scalar_object_any_fails_nul_nul(self):
Keyword = self.classes.Keyword
assert_raises(
exc.InvalidRequestError, lambda: Keyword.user.any(name="user2")
)
def test_filter_scalar_column_like(self):
User = self.classes.User
Singular = self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(User.singular_value.like("foo")),
self.session.query(User).filter(
User.singular.has(Singular.value.like("foo"))
),
)
def test_filter_scalar_column_contains(self):
User = self.classes.User
Singular = self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(
User.singular_value.contains("foo")
),
self.session.query(User).filter(
User.singular.has(Singular.value.contains("foo"))
),
)
def test_filter_scalar_column_eq(self):
User = self.classes.User
Singular = self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(User.singular_value == "foo"),
self.session.query(User).filter(
User.singular.has(Singular.value == "foo")
),
)
def test_filter_scalar_column_ne(self):
User = self.classes.User
Singular = self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(User.singular_value != "foo"),
self.session.query(User).filter(
User.singular.has(Singular.value != "foo")
),
)
def test_filter_scalar_column_eq_nul(self):
User = self.classes.User
Singular = self.classes.Singular
self._query_equivalent(
self.session.query(User).filter(User.singular_value == None),
self.session.query(User).filter(
or_(
User.singular.has(Singular.value == None),
User.singular == None,
)
),
)
def test_filter_collection_has_fails_ul_nul(self):
User = self.classes.User
assert_raises(
exc.InvalidRequestError, lambda: User.keywords.has(keyword="quick")
)
def test_filter_collection_eq_fails_ul_nul(self):
User = self.classes.User
assert_raises(
exc.InvalidRequestError, lambda: User.keywords == self.kw
)
def test_filter_collection_ne_fails_ul_nul(self):
User = self.classes.User
assert_raises(
exc.InvalidRequestError, lambda: User.keywords != self.kw
)
def test_join_separate_attr(self):
User = self.classes.User
self.assert_compile(
self.session.query(User)
.join(User.keywords.local_attr)
.join(User.keywords.remote_attr),
"SELECT users.id AS users_id, users.name AS users_name, "
"users.singular_id AS users_singular_id "
"FROM users JOIN userkeywords ON users.id = "
"userkeywords.user_id JOIN keywords ON keywords.id = "
"userkeywords.keyword_id",
)
def test_join_single_attr(self):
User = self.classes.User
self.assert_compile(
self.session.query(User)
.join(User.keywords.attr[0])
.join(User.keywords.attr[1]),
"SELECT users.id AS users_id, users.name AS users_name, "
"users.singular_id AS users_singular_id "
"FROM users JOIN userkeywords ON users.id = "
"userkeywords.user_id JOIN keywords ON keywords.id = "
"userkeywords.keyword_id",
)
@testing.variation("use_aliased", [True, False])
def test_aliased_class_one(self, use_aliased):
"""test #11622"""
User = self.classes.User
UserKeyword = self.classes.UserKeyword
if use_aliased:
second_user = aliased(
User,
select(User).where(User.name == "second").cte("second_user"),
)
else:
second_user = User
s1 = select(second_user).where(second_user.keywords.any())
s2 = select(second_user).where(
second_user.user_keywords.any(UserKeyword.keyword.has())
)
self._statement_equivalent(fixture_session(), s1, s2)
if use_aliased:
self.assert_compile(
s1,
"WITH second_user AS (SELECT users.id AS id, users.name AS "
"name, users.singular_id AS singular_id FROM users "
"WHERE users.name = :name_1) SELECT second_user.id, "
"second_user.name, second_user.singular_id FROM second_user "
"WHERE EXISTS (SELECT 1 FROM userkeywords "
"WHERE second_user.id = userkeywords.user_id AND "
"(EXISTS (SELECT 1 FROM keywords WHERE keywords.id = "
"userkeywords.keyword_id)))",
)
else:
self.assert_compile(
s1,
"SELECT users.id, users.name, users.singular_id FROM users "
"WHERE EXISTS (SELECT 1 FROM userkeywords WHERE users.id = "
"userkeywords.user_id AND (EXISTS (SELECT 1 FROM keywords "
"WHERE keywords.id = userkeywords.keyword_id)))",
)
class DictOfTupleUpdateTest(fixtures.MappedTest):
run_create_tables = None
@classmethod
def define_tables(cls, metadata):
Table("a", metadata, Column("id", Integer, primary_key=True))
Table(
"b",
metadata,
Column("id", Integer, primary_key=True),
Column("aid", Integer, ForeignKey("a.id")),
Column("elem", String),
)
@classmethod
def setup_mappers(cls):
a, b = cls.tables("a", "b")
class B(cls.Basic):
def __init__(self, key, elem):
self.key = key
self.elem = elem
class A(cls.Basic):
elements = association_proxy("orig", "elem", creator=B)
cls.mapper_registry.map_imperatively(
A,
a,
properties={
"orig": relationship(
B, collection_class=attribute_keyed_dict("key")
)
},
)
cls.mapper_registry.map_imperatively(B, b)
def test_update_one_elem_dict(self):
a1 = self.classes.A()
a1.elements.update({("B", 3): "elem2"})
eq_(a1.elements, {("B", 3): "elem2"})
def test_update_multi_elem_dict(self):
a1 = self.classes.A()
a1.elements.update({("B", 3): "elem2", ("C", 4): "elem3"})
eq_(a1.elements, {("B", 3): "elem2", ("C", 4): "elem3"})
def test_update_one_elem_list(self):
a1 = self.classes.A()
a1.elements.update([(("B", 3), "elem2")])
eq_(a1.elements, {("B", 3): "elem2"})
def test_update_multi_elem_list(self):
a1 = self.classes.A()
a1.elements.update([(("B", 3), "elem2"), (("C", 4), "elem3")])
eq_(a1.elements, {("B", 3): "elem2", ("C", 4): "elem3"})
def test_update_one_elem_varg(self):
a1 = self.classes.A()
assert_raises_message(
ValueError,
"dictionary update sequence element #1 has length 5; "
"2 is required",
a1.elements.update,
(("B", 3), "elem2"),
)
def test_update_multi_elem_varg(self):
a1 = self.classes.A()
assert_raises_message(
TypeError,
"update expected at most 1 arguments?, got 2",
a1.elements.update,
(("B", 3), "elem2"),
(("C", 4), "elem3"),
)
class CompositeAccessTest(fixtures.DeclarativeMappedTest):
run_create_tables = None
@classmethod
def setup_classes(cls):
class Point(cls.Basic):
def __init__(self, x, y):
self.x = x
self.y = y
def __composite_values__(self):
return [self.x, self.y]
__hash__ = None
def __eq__(self, other):
return (
isinstance(other, Point)
and other.x == self.x
and other.y == self.y
)
def __ne__(self, other):
return not isinstance(other, Point) or not self.__eq__(other)
class Graph(cls.DeclarativeBasic):
__tablename__ = "graph"
id = Column(
Integer, primary_key=True, test_needs_autoincrement=True
)
name = Column(String(30))
point_data = relationship("PointData")
points = association_proxy(
"point_data",
"point",
creator=lambda point: PointData(point=point),
)
class PointData(ComparableEntity, cls.DeclarativeBasic):
__tablename__ = "point"
id = Column(
Integer, primary_key=True, test_needs_autoincrement=True
)
graph_id = Column(ForeignKey("graph.id"))
x1 = Column(Integer)
y1 = Column(Integer)
point = composite(Point, x1, y1)
return Point, Graph, PointData
def test_append(self):
Point, Graph, PointData = self.classes("Point", "Graph", "PointData")
g1 = Graph()
g1.points.append(Point(3, 5))
eq_(g1.point_data, [PointData(point=Point(3, 5))])
def test_access(self):
Point, Graph, PointData = self.classes("Point", "Graph", "PointData")
g1 = Graph()
g1.point_data.append(PointData(point=Point(3, 5)))
g1.point_data.append(PointData(point=Point(10, 7)))
eq_(g1.points, [Point(3, 5), Point(10, 7)])
class AttributeAccessTest(fixtures.TestBase):
def teardown_test(self):
clear_mappers()
def test_resolve_aliased_class(self):
Base = declarative_base()
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
value = Column(String)
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
a_id = Column(Integer, ForeignKey(A.id))
a = relationship(A)
a_value = association_proxy("a", "value")
spec = aliased(B).a_value
is_(spec.owning_class, B)
spec = B.a_value
is_(spec.owning_class, B)
def test_resolved_w_subclass(self):
# test for issue #4185, as well as several below
Base = declarative_base()
class Mixin:
@declared_attr
def children(cls):
return association_proxy("_children", "value")
# 1. build parent, Mixin.children gets invoked, we add
# Parent.children
class Parent(Mixin, Base):
__tablename__ = "parent"
id = Column(Integer, primary_key=True)
_children = relationship("Child")
class Child(Base):
__tablename__ = "child"
parent_id = Column(
Integer, ForeignKey(Parent.id), primary_key=True
)
value = Column(String)
# 2. declarative builds up SubParent, scans through all attributes
# over all classes. Hits Mixin, hits "children", accesses "children"
# in terms of the class, e.g. SubParent.children. SubParent isn't
# mapped yet. association proxy then sets up "owning_class"
# as NoneType.
class SubParent(Parent):
__tablename__ = "subparent"
id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
configure_mappers()
# 3. which would break here.
p1 = Parent()
eq_(p1.children, [])
def test_resolved_to_correct_class_one(self):
Base = declarative_base()
class Parent(Base):
__tablename__ = "parent"
id = Column(Integer, primary_key=True)
_children = relationship("Child")
children = association_proxy("_children", "value")
class Child(Base):
__tablename__ = "child"
parent_id = Column(
Integer, ForeignKey(Parent.id), primary_key=True
)
value = Column(String)
class SubParent(Parent):
__tablename__ = "subparent"
id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
is_(SubParent.children.owning_class, SubParent)
is_(Parent.children.owning_class, Parent)
def test_resolved_to_correct_class_two(self):
Base = declarative_base()
class Parent(Base):
__tablename__ = "parent"
id = Column(Integer, primary_key=True)
_children = relationship("Child")
class Child(Base):
__tablename__ = "child"
parent_id = Column(
Integer, ForeignKey(Parent.id), primary_key=True
)
value = Column(String)
class SubParent(Parent):
__tablename__ = "subparent"
id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
children = association_proxy("_children", "value")
is_(SubParent.children.owning_class, SubParent)
def test_resolved_to_correct_class_three(self):
Base = declarative_base()
class Parent(Base):
__tablename__ = "parent"
id = Column(Integer, primary_key=True)
_children = relationship("Child")
class Child(Base):
__tablename__ = "child"
parent_id = Column(
Integer, ForeignKey(Parent.id), primary_key=True
)
value = Column(String)
class SubParent(Parent):
__tablename__ = "subparent"
id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
children = association_proxy("_children", "value")
class SubSubParent(SubParent):
__tablename__ = "subsubparent"
id = Column(Integer, ForeignKey(SubParent.id), primary_key=True)
is_(SubParent.children.owning_class, SubParent)
is_(SubSubParent.children.owning_class, SubSubParent)
def test_resolved_to_correct_class_four(self):
Base = declarative_base()
class Parent(Base):
__tablename__ = "parent"
id = Column(Integer, primary_key=True)
_children = relationship("Child")
children = association_proxy(
"_children", "value", creator=lambda value: Child(value=value)
)
class Child(Base):
__tablename__ = "child"
parent_id = Column(
Integer, ForeignKey(Parent.id), primary_key=True
)
value = Column(String)
class SubParent(Parent):
__tablename__ = "subparent"
id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
sp = SubParent()
sp.children = "c"
is_(SubParent.children.owning_class, SubParent)
is_(Parent.children.owning_class, Parent)
def test_resolved_to_correct_class_five(self):
Base = declarative_base()
class Mixin:
children = association_proxy("_children", "value")
class Parent(Mixin, Base):
__tablename__ = "parent"
id = Column(Integer, primary_key=True)
_children = relationship("Child")
class Child(Base):
__tablename__ = "child"
parent_id = Column(
Integer, ForeignKey(Parent.id), primary_key=True
)
value = Column(String)
# this triggers the owning routine, doesn't fail
Mixin.children
p1 = Parent()
c1 = Child(value="c1")
p1._children.append(c1)
is_(Parent.children.owning_class, Parent)
eq_(p1.children, ["c1"])
def _test_never_assign_nonetype(self):
foo = association_proxy("x", "y")
foo._calc_owner(None, None)
is_(foo.owning_class, None)
class Bat:
foo = association_proxy("x", "y")
Bat.foo
is_(Bat.foo.owning_class, None)
b1 = Bat()
assert_raises_message(
exc.InvalidRequestError,
"This association proxy has no mapped owning class; "
"can't locate a mapped property",
getattr,
b1,
"foo",
)
is_(Bat.foo.owning_class, None)
# after all that, we can map it
mapper(
Bat,
Table("bat", MetaData(), Column("x", Integer, primary_key=True)),
)
# answer is correct
is_(Bat.foo.owning_class, Bat)
class ScalarRemoveTest:
useobject = None
cascade_scalar_deletes = None
uselist = None
create_on_none_assignment = False
@classmethod
def setup_classes(cls):
Base = cls.DeclarativeBasic
class A(Base):
__tablename__ = "test_a"
id = Column(Integer, primary_key=True)
ab = relationship("AB", backref="a", uselist=cls.uselist)
b = association_proxy(
"ab",
"b",
creator=lambda b: AB(b=b),
cascade_scalar_deletes=cls.cascade_scalar_deletes,
create_on_none_assignment=cls.create_on_none_assignment,
)
if cls.useobject:
class B(Base):
__tablename__ = "test_b"
id = Column(Integer, primary_key=True)
ab = relationship("AB", backref="b")
class AB(Base):
__tablename__ = "test_ab"
a_id = Column(Integer, ForeignKey(A.id), primary_key=True)
b_id = Column(Integer, ForeignKey(B.id), primary_key=True)
else:
class AB(Base):
__tablename__ = "test_ab"
b = Column(Integer)
a_id = Column(Integer, ForeignKey(A.id), primary_key=True)
def test_set_nonnone_to_none(self):
if self.useobject:
A, AB, B = self.classes("A", "AB", "B")
else:
A, AB = self.classes("A", "AB")
a1 = A()
b1 = B() if self.useobject else 5
if self.uselist:
a1.b.append(b1)
else:
a1.b = b1
if self.uselist:
assert isinstance(a1.ab[0], AB)
else:
assert isinstance(a1.ab, AB)
if self.uselist:
a1.b.remove(b1)
else:
a1.b = None
if self.uselist:
eq_(a1.ab, [])
else:
if self.cascade_scalar_deletes:
assert a1.ab is None
else:
assert isinstance(a1.ab, AB)
assert a1.ab.b is None
def test_set_none_to_none(self):
if self.uselist:
return
if self.useobject:
A, AB, B = self.classes("A", "AB", "B")
else:
A, AB = self.classes("A", "AB")
a1 = A()
a1.b = None
if self.create_on_none_assignment:
assert isinstance(a1.ab, AB)
assert a1.ab is not None
eq_(a1.ab.b, None)
else:
assert a1.ab is None
def test_del_already_nonpresent(self):
if self.useobject:
A, AB, B = self.classes("A", "AB", "B")
else:
A, AB = self.classes("A", "AB")
a1 = A()
if self.uselist:
del a1.b
eq_(a1.ab, [])
else:
def go():
del a1.b
assert_raises_message(
AttributeError, "A.ab object does not have a value", go
)
def test_del(self):
if self.useobject:
A, AB, B = self.classes("A", "AB", "B")
else:
A, AB = self.classes("A", "AB")
b1 = B() if self.useobject else 5
a1 = A()
if self.uselist:
a1.b.append(b1)
else:
a1.b = b1
if self.uselist:
assert isinstance(a1.ab[0], AB)
else:
assert isinstance(a1.ab, AB)
del a1.b
if self.uselist:
eq_(a1.ab, [])
else:
assert a1.ab is None
def test_del_no_proxy(self):
if not self.uselist:
return
if self.useobject:
A, AB, B = self.classes("A", "AB", "B")
else:
A, AB = self.classes("A", "AB")
b1 = B() if self.useobject else 5
a1 = A()
a1.b.append(b1)
del a1.ab
# this is what it does for now, so maintain that w/ assoc proxy
eq_(a1.ab, [])
def test_del_already_nonpresent_no_proxy(self):
if not self.uselist:
return
if self.useobject:
A, AB, B = self.classes("A", "AB", "B")
else:
A, AB = self.classes("A", "AB")
a1 = A()
del a1.ab
# this is what it does for now, so maintain that w/ assoc proxy
eq_(a1.ab, [])
class ScalarRemoveListObjectCascade(
ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
run_create_tables = None
useobject = True
cascade_scalar_deletes = True
uselist = True
class ScalarRemoveScalarObjectCascade(
ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
run_create_tables = None
useobject = True
cascade_scalar_deletes = True
uselist = False
class ScalarRemoveListScalarCascade(
ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
run_create_tables = None
useobject = False
cascade_scalar_deletes = True
uselist = True
class ScalarRemoveScalarScalarCascade(
ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
run_create_tables = None
useobject = False
cascade_scalar_deletes = True
uselist = False
class ScalarRemoveListObjectNoCascade(
ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
run_create_tables = None
useobject = True
cascade_scalar_deletes = False
uselist = True
class ScalarRemoveScalarObjectNoCascade(
ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
run_create_tables = None
useobject = True
cascade_scalar_deletes = False
uselist = False
class ScalarRemoveScalarObjectNoCascadeNoneAssign(
ScalarRemoveScalarObjectNoCascade
):
create_on_none_assignment = True
class ScalarRemoveListScalarNoCascade(
ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
run_create_tables = None
useobject = False
cascade_scalar_deletes = False
uselist = True
class ScalarRemoveListScalarNoCascadeNoneAssign(
ScalarRemoveScalarObjectNoCascade
):
create_on_none_assignment = True
class ScalarRemoveScalarScalarNoCascade(
ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
run_create_tables = None
useobject = False
cascade_scalar_deletes = False
uselist = False
class InfoTest(fixtures.TestBase):
def test_constructor(self):
assoc = association_proxy("a", "b", info={"some_assoc": "some_value"})
eq_(assoc.info, {"some_assoc": "some_value"})
def test_empty(self):
assoc = association_proxy("a", "b")
eq_(assoc.info, {})
def test_via_cls(self):
class Foob:
assoc = association_proxy("a", "b")
eq_(Foob.assoc.info, {})
Foob.assoc.info["foo"] = "bar"
eq_(Foob.assoc.info, {"foo": "bar"})
class OnlyRelationshipTest(fixtures.DeclarativeMappedTest):
run_define_tables = None
run_create_tables = None
run_inserts = None
run_deletes = None
@classmethod
def setup_classes(cls):
Base = cls.DeclarativeBasic
class Foo(Base):
__tablename__ = "foo"
id = Column(Integer, primary_key=True)
foo = Column(String) # assume some composite datatype
bar = association_proxy("foo", "attr")
def test_setattr(self):
Foo = self.classes.Foo
f1 = Foo()
assert_raises_message(
NotImplementedError,
"association proxy to a non-relationship "
"intermediary is not supported",
setattr,
f1,
"bar",
"asdf",
)
def test_getattr(self):
Foo = self.classes.Foo
f1 = Foo()
assert_raises_message(
NotImplementedError,
"association proxy to a non-relationship "
"intermediary is not supported",
getattr,
f1,
"bar",
)
def test_get_class_attr(self):
Foo = self.classes.Foo
assert_raises_message(
NotImplementedError,
"association proxy to a non-relationship "
"intermediary is not supported",
getattr,
Foo,
"bar",
)
class MultiOwnerTest(
fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL
):
__dialect__ = "default"
run_define_tables = "each"
run_create_tables = None
run_inserts = None
run_deletes = None
run_setup_classes = "each"
run_setup_mappers = "each"
@classmethod
def setup_classes(cls):
Base = cls.DeclarativeBasic
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
type = Column(String(5), nullable=False)
d_values = association_proxy("ds", "value")
__mapper_args__ = {"polymorphic_on": type}
class B(A):
__tablename__ = "b"
id = Column(ForeignKey("a.id"), primary_key=True)
c1_id = Column(ForeignKey("c1.id"))
ds = relationship("D", primaryjoin="D.b_id == B.id")
__mapper_args__ = {"polymorphic_identity": "b"}
class C(A):
__tablename__ = "c"
id = Column(ForeignKey("a.id"), primary_key=True)
ds = relationship(
"D", primaryjoin="D.c_id == C.id", back_populates="c"
)
__mapper_args__ = {"polymorphic_identity": "c"}
class C1(C):
__tablename__ = "c1"
id = Column(ForeignKey("c.id"), primary_key=True)
csub_only_data = relationship("B") # uselist=True relationship
ds = relationship(
"D", primaryjoin="D.c1_id == C1.id", back_populates="c"
)
__mapper_args__ = {"polymorphic_identity": "c1"}
class C2(C):
__tablename__ = "c2"
id = Column(ForeignKey("c.id"), primary_key=True)
csub_only_data = Column(String(50)) # scalar Column
ds = relationship(
"D", primaryjoin="D.c2_id == C2.id", back_populates="c"
)
__mapper_args__ = {"polymorphic_identity": "c2"}
class D(Base):
__tablename__ = "d"
id = Column(Integer, primary_key=True)
value = Column(String(50))
b_id = Column(ForeignKey("b.id"))
c_id = Column(ForeignKey("c.id"))
c1_id = Column(ForeignKey("c1.id"))
c2_id = Column(ForeignKey("c2.id"))
c = relationship("C", primaryjoin="D.c_id == C.id")
c_data = association_proxy("c", "csub_only_data")
def _assert_raises_ambiguous(self, fn, *arg, **kw):
assert_raises_message(
AttributeError,
"Association proxy D.c refers to an attribute 'csub_only_data'",
fn,
*arg,
**kw,
)
def _assert_raises_attribute(self, message, fn, *arg, **kw):
assert_raises_message(AttributeError, message, fn, *arg, **kw)
def test_column_collection_expressions(self):
B, C, C2 = self.classes("B", "C", "C2")
self.assert_compile(
B.d_values.contains("b1"),
"EXISTS (SELECT 1 FROM d, b WHERE d.b_id = b.id "
"AND (d.value LIKE '%' || :value_1 || '%'))",
)
self.assert_compile(
C2.d_values.contains("c2"),
"EXISTS (SELECT 1 FROM d, c2 WHERE d.c2_id = c2.id "
"AND (d.value LIKE '%' || :value_1 || '%'))",
)
self.assert_compile(
C.d_values.contains("c1"),
"EXISTS (SELECT 1 FROM d, c WHERE d.c_id = c.id "
"AND (d.value LIKE '%' || :value_1 || '%'))",
)
def test_subclass_only_owner_none(self):
D, C, C2 = self.classes("D", "C", "C2")
d1 = D()
eq_(d1.c_data, None)
def test_subclass_only_owner_assign(self):
D, C, C2 = self.classes("D", "C", "C2")
d1 = D(c=C2())
d1.c_data = "some c2"
eq_(d1.c_data, "some c2")
def test_subclass_only_owner_get(self):
D, C, C2 = self.classes("D", "C", "C2")
d1 = D(c=C2(csub_only_data="some c2"))
eq_(d1.c_data, "some c2")
def test_subclass_only_owner_none_raise(self):
D, C, C2 = self.classes("D", "C", "C2")
d1 = D()
eq_(d1.c_data, None)
def test_subclass_only_owner_delete(self):
D, C, C2 = self.classes("D", "C", "C2")
d1 = D(c=C2(csub_only_data="some c2"))
eq_(d1.c.csub_only_data, "some c2")
del d1.c_data
assert not hasattr(d1.c, "csub_only_data")
def test_subclass_only_owner_assign_passes(self):
D, C, C2 = self.classes("D", "C", "C2")
d1 = D(c=C())
d1.c_data = "some c1"
# not mapped, but we set it
eq_(d1.c.csub_only_data, "some c1")
def test_subclass_only_owner_get_raises(self):
D, C, C2 = self.classes("D", "C", "C2")
d1 = D(c=C())
self._assert_raises_attribute(
"'C' object has no attribute 'csub_only_data'",
getattr,
d1,
"c_data",
)
def test_subclass_only_owner_delete_raises(self):
D, C, C2 = self.classes("D", "C", "C2")
d1 = D(c=C2(csub_only_data="some c2"))
eq_(d1.c_data, "some c2")
# now switch
d1.c = C()
self._assert_raises_attribute("csub_only_data", delattr, d1, "c_data")
def test_subclasses_conflicting_types(self):
B, D, C, C1, C2 = self.classes("B", "D", "C", "C1", "C2")
bs = [B(), B()]
d1 = D(c=C1(csub_only_data=bs))
d2 = D(c=C2(csub_only_data="some c2"))
association_proxy_object = inspect(D).all_orm_descriptors["c_data"]
inst1 = association_proxy_object.for_class(D, d1)
inst2 = association_proxy_object.for_class(D, d2)
eq_(inst1._target_is_object, True)
eq_(inst2._target_is_object, False)
# both instances are cached
inst0 = association_proxy_object.for_class(D)
eq_(inst0._lookup_cache, {C1: inst1, C2: inst2})
# cache works
is_(association_proxy_object.for_class(D, d1), inst1)
is_(association_proxy_object.for_class(D, d2), inst2)
def test_col_expressions_not_available(self):
(D,) = self.classes("D")
self._assert_raises_ambiguous(lambda: D.c_data == 5)
def test_rel_expressions_not_available(self):
(
B,
D,
) = self.classes("B", "D")
self._assert_raises_ambiguous(lambda: D.c_data.any(B.id == 5))
class ProxyOfSynonymTest(AssertsCompiledSQL, fixtures.DeclarativeMappedTest):
__dialect__ = "default"
run_create_tables = None
@classmethod
def setup_classes(cls):
from sqlalchemy.orm import synonym
Base = cls.DeclarativeBasic
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data = Column(String)
bs = relationship("B", backref="a")
data_syn = synonym("data")
b_data = association_proxy("bs", "data_syn")
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
a_id = Column(ForeignKey("a.id"))
data = Column(String)
data_syn = synonym("data")
a_data = association_proxy("a", "data_syn")
def test_o2m_instance_getter(self):
A, B = self.classes("A", "B")
a1 = A(bs=[B(data="bdata1"), B(data="bdata2")])
eq_(a1.b_data, ["bdata1", "bdata2"])
def test_m2o_instance_getter(self):
A, B = self.classes("A", "B")
b1 = B(a=A(data="adata"))
eq_(b1.a_data, "adata")
def test_o2m_expr(self):
A, B = self.classes("A", "B")
self.assert_compile(
A.b_data == "foo",
"EXISTS (SELECT 1 FROM b, a WHERE a.id = b.a_id "
"AND b.data = :data_1)",
)
class SynonymOfProxyTest(AssertsCompiledSQL, fixtures.DeclarativeMappedTest):
__dialect__ = "default"
run_create_tables = None
@classmethod
def setup_classes(cls):
from sqlalchemy.orm import synonym
Base = cls.DeclarativeBasic
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data = Column(String)
bs = relationship("B", backref="a")
b_data = association_proxy("bs", "data")
b_data_syn = synonym("b_data")
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
a_id = Column(ForeignKey("a.id"))
data = Column(String)
def test_hasattr(self):
A, B = self.classes("A", "B")
is_false(hasattr(A.b_data_syn, "nonexistent"))
def test_o2m_instance_getter(self):
A, B = self.classes("A", "B")
a1 = A(bs=[B(data="bdata1"), B(data="bdata2")])
eq_(a1.b_data_syn, ["bdata1", "bdata2"])
def test_o2m_expr(self):
A, B = self.classes("A", "B")
self.assert_compile(
A.b_data_syn == "foo",
"EXISTS (SELECT 1 FROM b, a WHERE a.id = b.a_id "
"AND b.data = :data_1)",
)
class ProxyHybridTest(fixtures.DeclarativeMappedTest, AssertsCompiledSQL):
__dialect__ = "default"
run_create_tables = None
@classmethod
def setup_classes(cls):
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm.interfaces import PropComparator
Base = cls.DeclarativeBasic
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
bs = relationship("B")
b_data = association_proxy("bs", "value")
well_behaved_b_data = association_proxy("bs", "well_behaved_value")
fails_on_class_access = association_proxy(
"bs", "fails_on_class_access"
)
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
aid = Column(ForeignKey("a.id"))
data = Column(String(50))
@hybrid_property
def well_behaved_value(self):
return self.data
@well_behaved_value.setter
def well_behaved_value(self, value):
self.data = value
@hybrid_property
def value(self):
return self.data
@value.setter
def value(self, value):
self.data = value
@value.comparator
class value(PropComparator):
# comparator has no proxy __getattr__, so we can't
# get to impl to see what we ar proxying towards.
# as of #4690 we assume column-oriented proxying
def __init__(self, cls):
self.cls = cls
@hybrid_property
def well_behaved_w_expr(self):
return self.data
@well_behaved_w_expr.setter
def well_behaved_w_expr(self, value):
self.data = value
@well_behaved_w_expr.expression
def well_behaved_w_expr(cls):
return cast(cls.data, Integer)
@hybrid_property
def fails_on_class_access(self):
return len(self.data)
class C(Base):
__tablename__ = "c"
id = Column(Integer, primary_key=True)
b_id = Column(ForeignKey("b.id"))
_b = relationship("B")
attr = association_proxy("_b", "well_behaved_w_expr")
def test_msg_fails_on_cls_access(self):
A, B = self.classes("A", "B")
a1 = A(bs=[B(data="b1")])
with expect_raises_message(
exc.InvalidRequestError,
"Association proxy received an unexpected error when trying to "
'retrieve attribute "B.fails_on_class_access" from '
r'class "B": .* no len\(\)',
):
a1.fails_on_class_access
def test_get_ambiguous(self):
A, B = self.classes("A", "B")
a1 = A(bs=[B(data="b1")])
eq_(a1.b_data[0], "b1")
def test_get_nonambiguous(self):
A, B = self.classes("A", "B")
a1 = A(bs=[B(data="b1")])
eq_(a1.well_behaved_b_data[0], "b1")
def test_set_ambiguous(self):
A, B = self.classes("A", "B")
a1 = A(bs=[B()])
a1.b_data[0] = "b1"
eq_(a1.b_data[0], "b1")
def test_set_nonambiguous(self):
A, B = self.classes("A", "B")
a1 = A(bs=[B()])
a1.b_data[0] = "b1"
eq_(a1.well_behaved_b_data[0], "b1")
def test_expr_nonambiguous(self):
A, B = self.classes("A", "B")
eq_(
str(A.well_behaved_b_data == 5),
"EXISTS (SELECT 1 \nFROM b, a \nWHERE "
"a.id = b.aid AND b.data = :data_1)",
)
def test_get_classlevel_ambiguous(self):
A, B = self.classes("A", "B")
eq_(
str(A.b_data),
"ColumnAssociationProxyInstance"
"(AssociationProxy('bs', 'value'))",
)
def test_comparator_ambiguous(self):
A, B = self.classes("A", "B")
s = fixture_session()
self.assert_compile(
s.query(A).filter(A.b_data.any()),
"SELECT a.id AS a_id FROM a WHERE EXISTS "
"(SELECT 1 FROM b WHERE a.id = b.aid)",
)
def test_explicit_expr(self):
(C,) = self.classes("C")
s = fixture_session()
self.assert_compile(
s.query(C).filter_by(attr=5),
"SELECT c.id AS c_id, c.b_id AS c_b_id FROM c WHERE EXISTS "
"(SELECT 1 FROM b WHERE b.id = c.b_id AND "
"CAST(b.data AS INTEGER) = :param_1)",
)
class ProxyPlainPropertyTest(fixtures.DeclarativeMappedTest):
run_create_tables = None
@classmethod
def setup_classes(cls):
Base = cls.DeclarativeBasic
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
bs = relationship("B")
b_data = association_proxy("bs", "value")
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
aid = Column(ForeignKey("a.id"))
data = Column(String(50))
@property
def value(self):
return self.data
@value.setter
def value(self, value):
self.data = value
def test_get_ambiguous(self):
A, B = self.classes("A", "B")
a1 = A(bs=[B(data="b1")])
eq_(a1.b_data[0], "b1")
def test_set_ambiguous(self):
A, B = self.classes("A", "B")
a1 = A(bs=[B()])
a1.b_data[0] = "b1"
eq_(a1.b_data[0], "b1")
def test_get_classlevel_ambiguous(self):
A, B = self.classes("A", "B")
eq_(
str(A.b_data),
"AmbiguousAssociationProxyInstance"
"(AssociationProxy('bs', 'value'))",
)
def test_expr_ambiguous(self):
A, B = self.classes("A", "B")
assert_raises_message(
AttributeError,
"Association proxy A.bs refers to an attribute "
"'value' that is not directly mapped",
lambda: A.b_data == 5,
)
class ScopeBehaviorTest(fixtures.DeclarativeMappedTest):
# test some GC scenarios, including issue #4268
@classmethod
def setup_classes(cls):
Base = cls.DeclarativeBasic
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data = Column(String(50))
bs = relationship("B")
b_dyn = relationship("B", lazy="dynamic", viewonly=True)
b_data = association_proxy("bs", "data")
b_dynamic_data = association_proxy("bs", "data")
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
aid = Column(ForeignKey("a.id"))
data = Column(String(50))
@classmethod
def insert_data(cls, connection):
A, B = cls.classes("A", "B")
s = Session(connection)
s.add_all(
[
A(id=1, bs=[B(data="b1"), B(data="b2")]),
A(id=2, bs=[B(data="b3"), B(data="b4")]),
]
)
s.commit()
s.close()
def test_plain_collection_gc(self):
A, B = self.classes("A", "B")
s = Session(testing.db)
a1 = s.query(A).filter_by(id=1).one()
a1bs = a1.bs # noqa
del a1
gc_collect()
assert (A, (1,), None) not in s.identity_map
@testing.fails("dynamic relationship strong references parent")
def test_dynamic_collection_gc(self):
A, B = self.classes("A", "B")
s = Session(testing.db)
a1 = s.query(A).filter_by(id=1).one()
a1bs = a1.b_dyn # noqa
del a1
gc_collect()
# also fails, AppenderQuery holds onto parent
assert (A, (1,), None) not in s.identity_map
@testing.fails("association proxy strong references parent")
def test_associated_collection_gc(self):
A, B = self.classes("A", "B")
s = Session(testing.db)
a1 = s.query(A).filter_by(id=1).one()
a1bs = a1.b_data # noqa
del a1
gc_collect()
assert (A, (1,), None) not in s.identity_map
@testing.fails("association proxy strong references parent")
def test_associated_dynamic_gc(self):
A, B = self.classes("A", "B")
s = Session(testing.db)
a1 = s.query(A).filter_by(id=1).one()
a1bs = a1.b_dynamic_data # noqa
del a1
gc_collect()
assert (A, (1,), None) not in s.identity_map
def test_plain_collection_iterate(self):
A, B = self.classes("A", "B")
s = Session(testing.db)
a1 = s.query(A).filter_by(id=1).one()
a1bs = a1.bs
del a1
gc_collect()
assert len(a1bs) == 2
def test_dynamic_collection_iterate(self):
A, B = self.classes("A", "B")
s = Session(testing.db)
a1 = s.query(A).filter_by(id=1).one()
a1bs = a1.b_dyn # noqa
del a1
gc_collect()
assert len(list(a1bs)) == 2
def test_associated_collection_iterate(self):
A, B = self.classes("A", "B")
s = Session(testing.db)
a1 = s.query(A).filter_by(id=1).one()
a1bs = a1.b_data
del a1
gc_collect()
assert len(a1bs) == 2
def test_associated_dynamic_iterate(self):
A, B = self.classes("A", "B")
s = Session(testing.db)
a1 = s.query(A).filter_by(id=1).one()
a1bs = a1.b_dynamic_data
del a1
gc_collect()
assert len(a1bs) == 2
class DeclOrmForms(fixtures.TestBase):
"""test issues related to #8880, #8878, #8876"""
def test_straight_decl_usage(self, decl_base):
"""test use of assoc prox as the default descriptor for a
dataclasses.field.
"""
class User(decl_base):
__allow_unmapped__ = True
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True)
user_keyword_associations: Mapped[List[UserKeywordAssociation]] = (
relationship(
back_populates="user",
cascade="all, delete-orphan",
)
)
keywords: AssociationProxy[list[str]] = association_proxy(
"user_keyword_associations", "keyword"
)
UserKeywordAssociation, Keyword = self._keyword_mapping(
User, decl_base
)
self._assert_keyword_assoc_mapping(
User, UserKeywordAssociation, Keyword, init=True
)
@testing.variation("embed_in_field", [True, False])
@testing.combinations(
{},
{"repr": False},
{"repr": True},
{"kw_only": True},
{"init": False},
{"default_factory": True},
argnames="field_kw",
)
def test_dc_decl_usage(self, dc_decl_base, embed_in_field, field_kw):
"""test use of assoc prox as the default descriptor for a
dataclasses.field.
This exercises #8880
"""
if field_kw.pop("default_factory", False) and not embed_in_field:
has_default_factory = True
field_kw["default_factory"] = lambda: [
Keyword("l1"),
Keyword("l2"),
Keyword("l3"),
]
else:
has_default_factory = False
class User(dc_decl_base):
__allow_unmapped__ = True
__tablename__ = "user"
id: Mapped[int] = mapped_column(
primary_key=True, repr=True, init=False
)
user_keyword_associations: Mapped[List[UserKeywordAssociation]] = (
relationship(
back_populates="user",
cascade="all, delete-orphan",
init=False,
)
)
if embed_in_field:
# this is an incorrect form to use with
# MappedAsDataclass. However, we want to make sure it
# works as kind of a test to ensure we are being as well
# behaved as possible with an explicit dataclasses.field(),
# by testing that it uses its normal descriptor-as-default
# behavior
keywords: AssociationProxy[list[str]] = dataclasses.field(
default=association_proxy(
"user_keyword_associations", "keyword"
),
**field_kw,
)
else:
keywords: AssociationProxy[list[str]] = association_proxy(
"user_keyword_associations", "keyword", **field_kw
)
UserKeywordAssociation, Keyword = self._dc_keyword_mapping(
User, dc_decl_base
)
# simplify __qualname__ so we can test repr() more easily
User.__qualname__ = "mod.User"
UserKeywordAssociation.__qualname__ = "mod.UserKeywordAssociation"
Keyword.__qualname__ = "mod.Keyword"
init = field_kw.get("init", True)
u1 = self._assert_keyword_assoc_mapping(
User,
UserKeywordAssociation,
Keyword,
init=init,
has_default_factory=has_default_factory,
)
if field_kw.get("repr", True):
eq_(
repr(u1),
"mod.User(id=None, user_keyword_associations=["
"mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
"keyword=mod.Keyword(id=None, keyword='k1'), user=...), "
"mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
"keyword=mod.Keyword(id=None, keyword='k2'), user=...), "
"mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
"keyword=mod.Keyword(id=None, keyword='k3'), user=...)], "
"keywords=[mod.Keyword(id=None, keyword='k1'), "
"mod.Keyword(id=None, keyword='k2'), "
"mod.Keyword(id=None, keyword='k3')])",
)
else:
eq_(
repr(u1),
"mod.User(id=None, user_keyword_associations=["
"mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
"keyword=mod.Keyword(id=None, keyword='k1'), user=...), "
"mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
"keyword=mod.Keyword(id=None, keyword='k2'), user=...), "
"mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
"keyword=mod.Keyword(id=None, keyword='k3'), user=...)])",
)
def _assert_keyword_assoc_mapping(
self,
User,
UserKeywordAssociation,
Keyword,
*,
init,
has_default_factory=False,
):
if not init:
with expect_raises_message(
TypeError, r"got an unexpected keyword argument 'keywords'"
):
User(keywords=[Keyword("k1"), Keyword("k2"), Keyword("k3")])
if has_default_factory:
u1 = User()
eq_(u1.keywords, [Keyword("l1"), Keyword("l2"), Keyword("l3")])
eq_(
[ka.keyword.keyword for ka in u1.user_keyword_associations],
["l1", "l2", "l3"],
)
if init:
u1 = User(keywords=[Keyword("k1"), Keyword("k2"), Keyword("k3")])
else:
u1 = User()
u1.keywords = [Keyword("k1"), Keyword("k2"), Keyword("k3")]
eq_(u1.keywords, [Keyword("k1"), Keyword("k2"), Keyword("k3")])
eq_(
[ka.keyword.keyword for ka in u1.user_keyword_associations],
["k1", "k2", "k3"],
)
return u1
def _keyword_mapping(self, User, decl_base):
class UserKeywordAssociation(decl_base):
__tablename__ = "user_keyword"
user_id: Mapped[int] = mapped_column(
ForeignKey("user.id"), primary_key=True
)
keyword_id: Mapped[int] = mapped_column(
ForeignKey("keyword.id"), primary_key=True
)
user: Mapped[User] = relationship(
back_populates="user_keyword_associations",
)
keyword: Mapped[Keyword] = relationship()
def __init__(self, keyword=None, user=None):
self.user = user
self.keyword = keyword
class Keyword(ComparableMixin, decl_base):
__tablename__ = "keyword"
id: Mapped[int] = mapped_column(primary_key=True)
keyword: Mapped[str] = mapped_column()
def __init__(self, keyword):
self.keyword = keyword
return UserKeywordAssociation, Keyword
def _dc_keyword_mapping(self, User, dc_decl_base):
class UserKeywordAssociation(dc_decl_base):
__tablename__ = "user_keyword"
user_id: Mapped[int] = mapped_column(
ForeignKey("user.id"), primary_key=True, init=False
)
keyword_id: Mapped[int] = mapped_column(
ForeignKey("keyword.id"), primary_key=True, init=False
)
keyword: Mapped[Keyword] = relationship(default=None)
user: Mapped[User] = relationship(
back_populates="user_keyword_associations", default=None
)
class Keyword(dc_decl_base):
__tablename__ = "keyword"
id: Mapped[int] = mapped_column(primary_key=True, init=False)
keyword: Mapped[str] = mapped_column(init=True)
return UserKeywordAssociation, Keyword