Files
sqlalchemy/test/orm/test_mapper.py
T
Mike Bayer 8ec1369eb3 Check for non-mapped property in synonym
An :class:`.InvalidRequestError` is raised when a :func:`.synonym`
is used against an attribute that is not against a :class:`.MapperProperty`,
such as an association proxy.  Previously, a recursion overflow would
occur trying to locate non-existent attributes.

Change-Id: If2ce38c429a69951df4c94b71b74edbd59d775e3
Fixes: #4067
(cherry picked from commit 130f31ca79)
2017-09-04 18:56:38 -04:00

3348 lines
108 KiB
Python

"""General mapper operations with an emphasis on selecting/loading."""
from sqlalchemy.testing import assert_raises, assert_raises_message
import sqlalchemy as sa
from sqlalchemy import testing
from sqlalchemy import MetaData, Integer, String, \
ForeignKey, func, util, select
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.engine import default
from sqlalchemy.orm import mapper, relationship, backref, \
create_session, class_mapper, configure_mappers, reconstructor, \
aliased, deferred, synonym, attributes, \
column_property, composite, dynamic_loader, \
comparable_property, Session
from sqlalchemy.orm.persistence import _sort_states
from sqlalchemy.testing import eq_, AssertsCompiledSQL, is_
from sqlalchemy.testing import fixtures
from test.orm import _fixtures
from sqlalchemy.testing.assertsql import CompiledSQL
import logging
import logging.handlers
class MapperTest(_fixtures.FixtureTest, AssertsCompiledSQL):
__dialect__ = 'default'
def test_prop_shadow(self):
"""A backref name may not shadow an existing property name."""
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(Address, addresses)
mapper(User, users,
properties={
'addresses': relationship(Address, backref='email_address')
})
assert_raises(sa.exc.ArgumentError, sa.orm.configure_mappers)
def test_update_attr_keys(self):
"""test that update()/insert() use the correct key when given
InstrumentedAttributes."""
User, users = self.classes.User, self.tables.users
mapper(User, users, properties={
'foobar': users.c.name
})
users.insert().values({User.foobar: 'name1'}).execute()
eq_(sa.select([User.foobar]).where(User.foobar == 'name1').
execute().fetchall(), [('name1',)])
users.update().values({User.foobar: User.foobar + 'foo'}).execute()
eq_(sa.select([User.foobar]).where(User.foobar == 'name1foo').
execute().fetchall(), [('name1foo',)])
def test_utils(self):
users = self.tables.users
addresses = self.tables.addresses
Address = self.classes.Address
from sqlalchemy.orm.base import _is_mapped_class, _is_aliased_class
class Foo(object):
x = "something"
@property
def y(self):
return "something else"
m = mapper(Foo, users, properties={"addresses": relationship(Address)})
mapper(Address, addresses)
a1 = aliased(Foo)
f = Foo()
for fn, arg, ret in [
(_is_mapped_class, Foo.x, False),
(_is_mapped_class, Foo.y, False),
(_is_mapped_class, Foo.name, False),
(_is_mapped_class, Foo.addresses, False),
(_is_mapped_class, Foo, True),
(_is_mapped_class, f, False),
(_is_mapped_class, a1, True),
(_is_mapped_class, m, True),
(_is_aliased_class, a1, True),
(_is_aliased_class, Foo.x, False),
(_is_aliased_class, Foo.y, False),
(_is_aliased_class, Foo, False),
(_is_aliased_class, f, False),
(_is_aliased_class, a1, True),
(_is_aliased_class, m, False),
]:
assert fn(arg) == ret
def test_entity_descriptor(self):
users = self.tables.users
from sqlalchemy.orm.base import _entity_descriptor
class Foo(object):
x = "something"
@property
def y(self):
return "something else"
m = mapper(Foo, users)
a1 = aliased(Foo)
for arg, key, ret in [
(m, "x", Foo.x),
(Foo, "x", Foo.x),
(a1, "x", a1.x),
(users, "name", users.c.name)
]:
assert _entity_descriptor(arg, key) is ret
def test_friendly_attribute_str_on_uncompiled_boom(self):
User, users = self.classes.User, self.tables.users
def boom():
raise Exception("it broke")
mapper(User, users, properties={
'addresses': relationship(boom)
})
# test that QueryableAttribute.__str__() doesn't
# cause a compile.
eq_(str(User.addresses), "User.addresses")
def test_exceptions_sticky(self):
"""test preservation of mapper compile errors raised during hasattr(),
as well as for redundant mapper compile calls. Test that
repeated calls don't stack up error messages.
"""
Address, addresses, User = (self.classes.Address,
self.tables.addresses,
self.classes.User)
mapper(Address, addresses, properties={
'user': relationship(User)
})
try:
hasattr(Address.user, 'property')
except sa.orm.exc.UnmappedClassError:
assert util.compat.py32
for i in range(3):
assert_raises_message(sa.exc.InvalidRequestError,
"^One or more "
"mappers failed to initialize - can't "
"proceed with initialization of other "
r"mappers. Triggering mapper\: "
r"'Mapper\|Address\|addresses'."
" Original exception was: Class "
"'test.orm._fixtures.User' is not mapped$",
configure_mappers)
def test_column_prefix(self):
users, User = self.tables.users, self.classes.User
mapper(User, users, column_prefix='_', properties={
'user_name': synonym('_name')
})
s = create_session()
u = s.query(User).get(7)
eq_(u._name, 'jack')
eq_(u._id, 7)
u2 = s.query(User).filter_by(user_name='jack').one()
assert u is u2
def test_no_pks_1(self):
User, users = self.classes.User, self.tables.users
s = sa.select([users.c.name]).alias('foo')
assert_raises(sa.exc.ArgumentError, mapper, User, s)
def test_no_pks_2(self):
User, users = self.classes.User, self.tables.users
s = sa.select([users.c.name]).alias()
assert_raises(sa.exc.ArgumentError, mapper, User, s)
def test_reconfigure_on_other_mapper(self):
"""A configure trigger on an already-configured mapper
still triggers a check against all mappers."""
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
mapper(User, users)
sa.orm.configure_mappers()
assert sa.orm.mapperlib.Mapper._new_mappers is False
m = mapper(Address, addresses, properties={
'user': relationship(User, backref="addresses")})
assert m.configured is False
assert sa.orm.mapperlib.Mapper._new_mappers is True
u = User()
assert User.addresses
assert sa.orm.mapperlib.Mapper._new_mappers is False
def test_configure_on_session(self):
User, users = self.classes.User, self.tables.users
m = mapper(User, users)
session = create_session()
session.connection(m)
def test_incomplete_columns(self):
"""Loading from a select which does not contain all columns"""
addresses, Address = self.tables.addresses, self.classes.Address
mapper(Address, addresses)
s = create_session()
a = s.query(Address).from_statement(
sa.select([addresses.c.id, addresses.c.user_id]).
order_by(addresses.c.id)).first()
eq_(a.user_id, 7)
eq_(a.id, 1)
# email address auto-defers
assert 'email_addres' not in a.__dict__
eq_(a.email_address, 'jack@bean.com')
def test_column_not_present(self):
users, addresses, User = (self.tables.users,
self.tables.addresses,
self.classes.User)
assert_raises_message(sa.exc.ArgumentError,
"not represented in the mapper's table",
mapper, User, users,
properties={'foo': addresses.c.user_id})
def test_constructor_exc(self):
"""TypeError is raised for illegal constructor args,
whether or not explicit __init__ is present [ticket:908]."""
users, addresses = self.tables.users, self.tables.addresses
class Foo(object):
def __init__(self):
pass
class Bar(object):
pass
mapper(Foo, users)
mapper(Bar, addresses)
assert_raises(TypeError, Foo, x=5)
assert_raises(TypeError, Bar, x=5)
def test_sort_states_comparisons(self):
"""test that _sort_states() doesn't compare
insert_order to state.key, for set of mixed
persistent/pending. In particular Python 3 disallows
this.
"""
class Foo(object):
def __init__(self, id):
self.id = id
m = MetaData()
foo_t = Table('foo', m,
Column('id', String, primary_key=True)
)
m = mapper(Foo, foo_t)
class DontCompareMeToString(int):
if util.py2k:
def __lt__(self, other):
assert not isinstance(other, basestring)
return int(self) < other
foos = [Foo(id='f%d' % i) for i in range(5)]
states = [attributes.instance_state(f) for f in foos]
for s in states[0:3]:
s.key = m._identity_key_from_state(s)
states[3].insert_order = DontCompareMeToString(5)
states[4].insert_order = DontCompareMeToString(1)
states[2].insert_order = DontCompareMeToString(3)
eq_(
_sort_states(states),
[states[4], states[3], states[0], states[1], states[2]]
)
def test_props(self):
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
m = mapper(User, users, properties={
'addresses': relationship(mapper(Address, addresses))
})
assert User.addresses.property is m.get_property('addresses')
def test_unicode_relationship_backref_names(self):
# test [ticket:2901]
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
mapper(Address, addresses)
mapper(User, users, properties={
util.u('addresses'): relationship(Address, backref=util.u('user'))
})
u1 = User()
a1 = Address()
u1.addresses.append(a1)
assert a1.user is u1
def test_configure_on_prop_1(self):
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
mapper(User, users, properties={
'addresses': relationship(mapper(Address, addresses))
})
User.addresses.any(Address.email_address == 'foo@bar.com')
def test_configure_on_prop_2(self):
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
mapper(User, users, properties={
'addresses': relationship(mapper(Address, addresses))
})
eq_(str(User.id == 3), str(users.c.id == 3))
def test_configure_on_prop_3(self):
users, addresses, User = (self.tables.users,
self.tables.addresses,
self.classes.User)
class Foo(User):
pass
mapper(User, users)
mapper(Foo, addresses, inherits=User, properties={
'address_id': addresses.c.id
})
assert getattr(Foo().__class__, 'name').impl is not None
def test_deferred_subclass_attribute_instrument(self):
users, addresses, User = (self.tables.users,
self.tables.addresses,
self.classes.User)
class Foo(User):
pass
mapper(User, users)
configure_mappers()
mapper(Foo, addresses, inherits=User, properties={
'address_id': addresses.c.id
})
assert getattr(Foo().__class__, 'name').impl is not None
def test_class_hier_only_instrument_once_multiple_configure(self):
users, addresses = (self.tables.users, self.tables.addresses)
class A(object):
pass
class ASub(A):
pass
class ASubSub(ASub):
pass
class B(object):
pass
from sqlalchemy.testing import mock
from sqlalchemy.orm.attributes import register_attribute_impl
with mock.patch(
"sqlalchemy.orm.attributes.register_attribute_impl",
side_effect=register_attribute_impl
) as some_mock:
mapper(A, users, properties={
'bs': relationship(B)
})
mapper(B, addresses)
configure_mappers()
mapper(ASub, inherits=A)
mapper(ASubSub, inherits=ASub)
configure_mappers()
b_calls = [
c for c in some_mock.mock_calls if c[1][1] == 'bs'
]
eq_(len(b_calls), 3)
def test_check_descriptor_as_method(self):
User, users = self.classes.User, self.tables.users
m = mapper(User, users)
class MyClass(User):
def foo(self):
pass
m._is_userland_descriptor(MyClass.foo)
def test_configure_on_get_props_1(self):
User, users = self.classes.User, self.tables.users
m = mapper(User, users)
assert not m.configured
assert list(m.iterate_properties)
assert m.configured
def test_configure_on_get_props_2(self):
User, users = self.classes.User, self.tables.users
m = mapper(User, users)
assert not m.configured
assert m.get_property('name')
assert m.configured
def test_configure_on_get_props_3(self):
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
m = mapper(User, users)
assert not m.configured
configure_mappers()
m2 = mapper(Address, addresses, properties={
'user': relationship(User, backref='addresses')
})
assert m.get_property('addresses')
def test_info(self):
users = self.tables.users
Address = self.classes.Address
class MyComposite(object):
pass
for constructor, args in [
(column_property, (users.c.name,)),
(relationship, (Address,)),
(composite, (MyComposite, 'id', 'name')),
(synonym, 'foo'),
(comparable_property, 'foo')
]:
obj = constructor(info={"x": "y"}, *args)
eq_(obj.info, {"x": "y"})
obj.info["q"] = "p"
eq_(obj.info, {"x": "y", "q": "p"})
obj = constructor(*args)
eq_(obj.info, {})
obj.info["q"] = "p"
eq_(obj.info, {"q": "p"})
def test_info_via_instrumented(self):
m = MetaData()
# create specific tables here as we don't want
# users.c.id.info to be pre-initialized
users = Table('u', m, Column('id', Integer, primary_key=True),
Column('name', String))
addresses = Table('a', m, Column('id', Integer, primary_key=True),
Column('name', String),
Column('user_id', Integer, ForeignKey('u.id')))
Address = self.classes.Address
User = self.classes.User
mapper(User, users, properties={
"name_lower": column_property(func.lower(users.c.name)),
"addresses": relationship(Address)
})
mapper(Address, addresses)
# attr.info goes down to the original Column object
# for the dictionary. The annotated element needs to pass
# this on.
assert 'info' not in users.c.id.__dict__
is_(User.id.info, users.c.id.info)
assert 'info' in users.c.id.__dict__
# for SQL expressions, ORM-level .info
is_(User.name_lower.info, User.name_lower.property.info)
# same for relationships
is_(User.addresses.info, User.addresses.property.info)
def test_add_property(self):
users, addresses, Address = (self.tables.users,
self.tables.addresses,
self.classes.Address)
assert_col = []
class User(fixtures.ComparableEntity):
def _get_name(self):
assert_col.append(('get', self._name))
return self._name
def _set_name(self, name):
assert_col.append(('set', name))
self._name = name
name = property(_get_name, _set_name)
def _uc_name(self):
if self._name is None:
return None
return self._name.upper()
uc_name = property(_uc_name)
uc_name2 = property(_uc_name)
m = mapper(User, users)
mapper(Address, addresses)
class UCComparator(sa.orm.PropComparator):
__hash__ = None
def __eq__(self, other):
cls = self.prop.parent.class_
col = getattr(cls, 'name')
if other is None:
return col is None
else:
return sa.func.upper(col) == sa.func.upper(other)
m.add_property('_name', deferred(users.c.name))
m.add_property('name', synonym('_name'))
m.add_property('addresses', relationship(Address))
m.add_property('uc_name', sa.orm.comparable_property(UCComparator))
m.add_property('uc_name2', sa.orm.comparable_property(
UCComparator, User.uc_name2))
sess = create_session(autocommit=False)
assert sess.query(User).get(7)
u = sess.query(User).filter_by(name='jack').one()
def go():
eq_(len(u.addresses),
len(self.static.user_address_result[0].addresses))
eq_(u.name, 'jack')
eq_(u.uc_name, 'JACK')
eq_(u.uc_name2, 'JACK')
eq_(assert_col, [('get', 'jack')], str(assert_col))
self.sql_count_(2, go)
u.name = 'ed'
u3 = User()
u3.name = 'some user'
sess.add(u3)
sess.flush()
sess.rollback()
def test_add_prop_via_backref_resets_memoizations_reconfigures(self):
users, User = self.tables.users, self.classes.User
addresses, Address = self.tables.addresses, self.classes.Address
m1 = mapper(User, users)
User()
m2 = mapper(Address, addresses, properties={
'user': relationship(User, backref="addresses")
})
# configure mappers takes place when User is generated
User()
assert hasattr(User, 'addresses')
assert "addresses" in [p.key for p in m1._polymorphic_properties]
def test_replace_col_prop_w_syn(self):
users, User = self.tables.users, self.classes.User
m = mapper(User, users)
m.add_property('_name', users.c.name)
m.add_property('name', synonym('_name'))
sess = create_session()
u = sess.query(User).filter_by(name='jack').one()
eq_(u._name, 'jack')
eq_(u.name, 'jack')
u.name = 'jacko'
assert m._columntoproperty[users.c.name] is m.get_property('_name')
sa.orm.clear_mappers()
m = mapper(User, users)
m.add_property('name', synonym('_name', map_column=True))
sess.expunge_all()
u = sess.query(User).filter_by(name='jack').one()
eq_(u._name, 'jack')
eq_(u.name, 'jack')
u.name = 'jacko'
assert m._columntoproperty[users.c.name] is m.get_property('_name')
def test_replace_rel_prop_with_rel_warns(self):
users, User = self.tables.users, self.classes.User
addresses, Address = self.tables.addresses, self.classes.Address
m = mapper(User, users, properties={
"addresses": relationship(Address)
})
mapper(Address, addresses)
assert_raises_message(
sa.exc.SAWarning,
"Property User.addresses on Mapper|User|users being replaced "
"with new property User.addresses; the old property will "
"be discarded",
m.add_property,
"addresses", relationship(Address)
)
def test_add_column_prop_deannotate(self):
User, users = self.classes.User, self.tables.users
Address, addresses = self.classes.Address, self.tables.addresses
class SubUser(User):
pass
m = mapper(User, users)
m2 = mapper(SubUser, addresses, inherits=User, properties={
'address_id': addresses.c.id
})
m3 = mapper(Address, addresses, properties={
'foo': relationship(m2)
})
# add property using annotated User.name,
# needs to be deannotated
m.add_property("x", column_property(User.name + "name"))
s = create_session()
q = s.query(m2).select_from(Address).join(Address.foo)
self.assert_compile(
q,
"SELECT "
"addresses_1.id AS addresses_1_id, "
"users_1.id AS users_1_id, "
"users_1.name AS users_1_name, "
"addresses_1.user_id AS addresses_1_user_id, "
"addresses_1.email_address AS "
"addresses_1_email_address, "
"users_1.name || :name_1 AS anon_1 "
"FROM addresses JOIN (users AS users_1 JOIN addresses "
"AS addresses_1 ON users_1.id = "
"addresses_1.user_id) ON "
"users_1.id = addresses.user_id"
)
def test_column_prop_deannotate(self):
"""test that column property deannotates,
bringing expressions down to the original mapped columns.
"""
User, users = self.classes.User, self.tables.users
m = mapper(User, users)
assert User.id.property.columns[0] is users.c.id
assert User.name.property.columns[0] is users.c.name
expr = User.name + "name"
expr2 = sa.select([User.name, users.c.id])
m.add_property("x", column_property(expr))
m.add_property("y", column_property(expr2))
assert User.x.property.columns[0] is not expr
assert User.x.property.columns[0].element.left is users.c.name
# a deannotate needs to clone the base, in case
# the original one referenced annotated elements.
assert User.x.property.columns[0].element.right is not expr.right
assert User.y.property.columns[0] is not expr2
assert User.y.property.columns[0].element.\
_raw_columns[0] is users.c.name
assert User.y.property.columns[0].element.\
_raw_columns[1] is users.c.id
def test_synonym_replaces_backref(self):
addresses, users, User = (self.tables.addresses,
self.tables.users,
self.classes.User)
assert_calls = []
class Address(object):
def _get_user(self):
assert_calls.append("get")
return self._user
def _set_user(self, user):
assert_calls.append("set")
self._user = user
user = property(_get_user, _set_user)
# synonym is created against nonexistent prop
mapper(Address, addresses, properties={
'user': synonym('_user')
})
sa.orm.configure_mappers()
# later, backref sets up the prop
mapper(User, users, properties={
'addresses': relationship(Address, backref='_user')
})
sess = create_session()
u1 = sess.query(User).get(7)
u2 = sess.query(User).get(8)
# comparaison ops need to work
a1 = sess.query(Address).filter(Address.user == u1).one()
eq_(a1.id, 1)
a1.user = u2
assert a1.user is u2
eq_(assert_calls, ["set", "get"])
def test_self_ref_synonym(self):
t = Table('nodes', MetaData(),
Column(
'id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('parent_id', Integer, ForeignKey('nodes.id')))
class Node(object):
pass
mapper(Node, t, properties={
'_children': relationship(
Node, backref=backref('_parent', remote_side=t.c.id)),
'children': synonym('_children'),
'parent': synonym('_parent')
})
n1 = Node()
n2 = Node()
n1.children.append(n2)
assert n2.parent is n2._parent is n1
assert n1.children[0] is n1._children[0] is n2
eq_(str(Node.parent == n2), ":param_1 = nodes.parent_id")
def test_non_primary_identity_class(self):
User = self.classes.User
users, addresses = self.tables.users, self.tables.addresses
class AddressUser(User):
pass
m1 = mapper(User, users, polymorphic_identity='user')
m2 = mapper(AddressUser, addresses, inherits=User,
polymorphic_identity='address', properties={
'address_id': addresses.c.id
})
m3 = mapper(AddressUser, addresses, non_primary=True)
assert m3._identity_class is m2._identity_class
eq_(
m2.identity_key_from_instance(AddressUser()),
m3.identity_key_from_instance(AddressUser())
)
def test_reassign_polymorphic_identity_warns(self):
User = self.classes.User
users = self.tables.users
class MyUser(User):
pass
m1 = mapper(User, users, polymorphic_on=users.c.name,
polymorphic_identity='user')
assert_raises_message(
sa.exc.SAWarning,
"Reassigning polymorphic association for identity 'user'",
mapper,
MyUser, users, inherits=User, polymorphic_identity='user'
)
def test_illegal_non_primary(self):
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
mapper(User, users)
mapper(Address, addresses)
mapper(User, users, non_primary=True, properties={
'addresses': relationship(Address)
})
assert_raises_message(
sa.exc.ArgumentError,
"Attempting to assign a new relationship 'addresses' "
"to a non-primary mapper on class 'User'",
configure_mappers
)
def test_illegal_non_primary_2(self):
User, users = self.classes.User, self.tables.users
assert_raises_message(
sa.exc.InvalidRequestError,
"Configure a primary mapper first",
mapper, User, users, non_primary=True)
def test_illegal_non_primary_3(self):
users, addresses = self.tables.users, self.tables.addresses
class Base(object):
pass
class Sub(Base):
pass
mapper(Base, users)
assert_raises_message(sa.exc.InvalidRequestError,
"Configure a primary mapper first",
mapper, Sub, addresses, non_primary=True
)
def test_prop_filters(self):
t = Table('person', MetaData(),
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('type', String(128)),
Column('name', String(128)),
Column('employee_number', Integer),
Column('boss_id', Integer, ForeignKey('person.id')),
Column('vendor_id', Integer))
class Person(object):
pass
class Vendor(Person):
pass
class Employee(Person):
pass
class Manager(Employee):
pass
class Hoho(object):
pass
class Lala(object):
pass
class Fub(object):
pass
class Frob(object):
pass
class HasDef(object):
def name(self):
pass
class Empty(object):
pass
mapper(
Empty, t, properties={'empty_id': t.c.id},
include_properties=[])
p_m = mapper(Person, t, polymorphic_on=t.c.type,
include_properties=('id', 'type', 'name'))
e_m = mapper(Employee, inherits=p_m,
polymorphic_identity='employee',
properties={
'boss': relationship(
Manager, backref=backref('peon'),
remote_side=t.c.id)},
exclude_properties=('vendor_id', ))
mapper(
Manager, inherits=e_m, polymorphic_identity='manager',
include_properties=('id', 'type'))
mapper(
Vendor, inherits=p_m, polymorphic_identity='vendor',
exclude_properties=('boss_id', 'employee_number'))
mapper(Hoho, t, include_properties=('id', 'type', 'name'))
mapper(
Lala, t, exclude_properties=('vendor_id', 'boss_id'),
column_prefix="p_")
mapper(HasDef, t, column_prefix="h_")
mapper(Fub, t, include_properties=(t.c.id, t.c.type))
mapper(
Frob, t, column_prefix='f_',
exclude_properties=(
t.c.boss_id,
'employee_number', t.c.vendor_id))
configure_mappers()
def assert_props(cls, want):
have = set([n for n in dir(cls) if not n.startswith('_')])
want = set(want)
eq_(have, want)
def assert_instrumented(cls, want):
have = set([p.key for p in class_mapper(cls).iterate_properties])
want = set(want)
eq_(have, want)
assert_props(HasDef, ['h_boss_id', 'h_employee_number', 'h_id',
'name', 'h_name', 'h_vendor_id', 'h_type'])
assert_props(Person, ['id', 'name', 'type'])
assert_instrumented(Person, ['id', 'name', 'type'])
assert_props(Employee, ['boss', 'boss_id', 'employee_number',
'id', 'name', 'type'])
assert_instrumented(Employee, ['boss', 'boss_id', 'employee_number',
'id', 'name', 'type'])
assert_props(Manager, ['boss', 'boss_id', 'employee_number', 'peon',
'id', 'name', 'type'])
# 'peon' and 'type' are both explicitly stated properties
assert_instrumented(Manager, ['peon', 'type', 'id'])
assert_props(Vendor, ['vendor_id', 'id', 'name', 'type'])
assert_props(Hoho, ['id', 'name', 'type'])
assert_props(Lala, ['p_employee_number', 'p_id', 'p_name', 'p_type'])
assert_props(Fub, ['id', 'type'])
assert_props(Frob, ['f_id', 'f_type', 'f_name', ])
# putting the discriminator column in exclude_properties,
# very weird. As of 0.7.4 this re-maps it.
class Foo(Person):
pass
assert_props(Empty, ['empty_id'])
mapper(
Foo, inherits=Person, polymorphic_identity='foo',
exclude_properties=('type', ),
)
assert hasattr(Foo, "type")
assert Foo.type.property.columns[0] is t.c.type
@testing.provide_metadata
def test_prop_filters_defaults(self):
metadata = self.metadata
t = Table('t', metadata,
Column(
'id', Integer(), primary_key=True,
test_needs_autoincrement=True),
Column('x', Integer(), nullable=False, server_default='0')
)
t.create()
class A(object):
pass
mapper(A, t, include_properties=['id'])
s = Session()
s.add(A())
s.commit()
def test_we_dont_call_bool(self):
class NoBoolAllowed(object):
def __bool__(self):
raise Exception("nope")
mapper(NoBoolAllowed, self.tables.users)
u1 = NoBoolAllowed()
u1.name = "some name"
s = Session(testing.db)
s.add(u1)
s.commit()
assert s.query(NoBoolAllowed).get(u1.id) is u1
def test_we_dont_call_eq(self):
class NoEqAllowed(object):
def __eq__(self, other):
raise Exception("nope")
addresses, users = self.tables.addresses, self.tables.users
Address = self.classes.Address
mapper(NoEqAllowed, users, properties={
'addresses': relationship(Address, backref='user')
})
mapper(Address, addresses)
u1 = NoEqAllowed()
u1.name = "some name"
u1.addresses = [Address(id=12, email_address='a1')]
s = Session(testing.db)
s.add(u1)
s.commit()
a1 = s.query(Address).filter_by(id=12).one()
assert a1.user is u1
def test_mapping_to_join_raises(self):
"""Test implicit merging of two cols raises."""
addresses, users, User = (self.tables.addresses,
self.tables.users,
self.classes.User)
usersaddresses = sa.join(users, addresses,
users.c.id == addresses.c.user_id)
assert_raises_message(
sa.exc.InvalidRequestError,
"Implicitly",
mapper, User, usersaddresses, primary_key=[users.c.id]
)
def test_mapping_to_join_explicit_prop(self):
"""Mapping to a join"""
User, addresses, users = (self.classes.User,
self.tables.addresses,
self.tables.users)
usersaddresses = sa.join(users, addresses, users.c.id
== addresses.c.user_id)
mapper(User, usersaddresses, primary_key=[users.c.id],
properties={'add_id': addresses.c.id}
)
result = create_session().query(User).order_by(users.c.id).all()
eq_(result, self.static.user_result[:3])
def test_mapping_to_join_exclude_prop(self):
"""Mapping to a join"""
User, addresses, users = (self.classes.User,
self.tables.addresses,
self.tables.users)
usersaddresses = sa.join(users, addresses, users.c.id
== addresses.c.user_id)
mapper(User, usersaddresses, primary_key=[users.c.id],
exclude_properties=[addresses.c.id]
)
result = create_session().query(User).order_by(users.c.id).all()
eq_(result, self.static.user_result[:3])
def test_mapping_to_join_no_pk(self):
email_bounces, addresses, Address = (self.tables.email_bounces,
self.tables.addresses,
self.classes.Address)
m = mapper(Address,
addresses.join(email_bounces),
properties={'id': [addresses.c.id, email_bounces.c.id]}
)
configure_mappers()
assert addresses in m._pks_by_table
assert email_bounces not in m._pks_by_table
sess = create_session()
a = Address(id=10, email_address='e1')
sess.add(a)
sess.flush()
eq_(
select([func.count('*')]).select_from(addresses).scalar(), 6)
eq_(
select([func.count('*')]).select_from(email_bounces).scalar(), 5)
def test_mapping_to_outerjoin(self):
"""Mapping to an outer join with a nullable composite primary key."""
users, addresses, User = (self.tables.users,
self.tables.addresses,
self.classes.User)
mapper(User, users.outerjoin(addresses),
primary_key=[users.c.id, addresses.c.id],
properties=dict(
address_id=addresses.c.id))
session = create_session()
result = session.query(User).order_by(User.id, User.address_id).all()
eq_(result, [
User(id=7, address_id=1),
User(id=8, address_id=2),
User(id=8, address_id=3),
User(id=8, address_id=4),
User(id=9, address_id=5),
User(id=10, address_id=None)])
def test_mapping_to_outerjoin_no_partial_pks(self):
"""test the allow_partial_pks=False flag."""
users, addresses, User = (self.tables.users,
self.tables.addresses,
self.classes.User)
mapper(User, users.outerjoin(addresses),
allow_partial_pks=False,
primary_key=[users.c.id, addresses.c.id],
properties=dict(
address_id=addresses.c.id))
session = create_session()
result = session.query(User).order_by(User.id, User.address_id).all()
eq_(result, [
User(id=7, address_id=1),
User(id=8, address_id=2),
User(id=8, address_id=3),
User(id=8, address_id=4),
User(id=9, address_id=5),
None])
def test_scalar_pk_arg(self):
users, Keyword, items, Item, User, keywords = (self.tables.users,
self.classes.Keyword,
self.tables.items,
self.classes.Item,
self.classes.User,
self.tables.keywords)
m1 = mapper(Item, items, primary_key=[items.c.id])
m2 = mapper(Keyword, keywords, primary_key=keywords.c.id)
m3 = mapper(User, users, primary_key=(users.c.id,))
assert m1.primary_key[0] is items.c.id
assert m2.primary_key[0] is keywords.c.id
assert m3.primary_key[0] is users.c.id
def test_custom_join(self):
"""select_from totally replace the FROM parameters."""
users, items, order_items, orders, Item, User, Order = (
self.tables.users,
self.tables.items,
self.tables.order_items,
self.tables.orders,
self.classes.Item,
self.classes.User,
self.classes.Order)
mapper(Item, items)
mapper(Order, orders, properties=dict(
items=relationship(Item, order_items)))
mapper(User, users, properties=dict(
orders=relationship(Order)))
session = create_session()
result = (session.query(User).
select_from(users.join(orders).
join(order_items).
join(items)).
filter(items.c.description == 'item 4')).all()
eq_(result, [self.static.user_result[0]])
@testing.uses_deprecated("Mapper.order_by")
def test_cancel_order_by(self):
users, User = self.tables.users, self.classes.User
mapper(User, users, order_by=users.c.name.desc())
assert "order by users.name desc" in \
str(create_session().query(User).statement).lower()
assert "order by" not in \
str(create_session().query(User).order_by(None).statement).lower()
assert "order by users.name asc" in \
str(create_session().query(User).order_by(
User.name.asc()).statement).lower()
eq_(
create_session().query(User).all(),
[User(id=7, name='jack'), User(id=9, name='fred'),
User(id=8, name='ed'), User(id=10, name='chuck')]
)
eq_(
create_session().query(User).order_by(User.name).all(),
[User(id=10, name='chuck'), User(id=8, name='ed'),
User(id=9, name='fred'), User(id=7, name='jack')]
)
# 'Raises a "expression evaluation not supported" error at prepare time
@testing.fails_on('firebird', 'FIXME: unknown')
def test_function(self):
"""Mapping to a SELECT statement that has functions in it."""
addresses, users, User = (self.tables.addresses,
self.tables.users,
self.classes.User)
s = sa.select([users,
(users.c.id * 2).label('concat'),
sa.func.count(addresses.c.id).label('count')],
users.c.id == addresses.c.user_id,
group_by=[c for c in users.c]).alias('myselect')
mapper(User, s)
sess = create_session()
result = sess.query(User).order_by(s.c.id).all()
for idx, total in enumerate((14, 16)):
eq_(result[idx].concat, result[idx].id * 2)
eq_(result[idx].concat, total)
def test_count(self):
"""The count function on Query."""
User, users = self.classes.User, self.tables.users
mapper(User, users)
session = create_session()
q = session.query(User)
eq_(q.count(), 4)
eq_(q.filter(User.id.in_([8, 9])).count(), 2)
eq_(q.filter(users.c.id.in_([8, 9])).count(), 2)
eq_(session.query(User.id).count(), 4)
eq_(session.query(User.id).filter(User.id.in_((8, 9))).count(), 2)
def test_many_to_many_count(self):
keywords, items, item_keywords, Keyword, Item = (
self.tables.keywords,
self.tables.items,
self.tables.item_keywords,
self.classes.Keyword,
self.classes.Item)
mapper(Keyword, keywords)
mapper(Item, items, properties=dict(
keywords=relationship(Keyword, item_keywords, lazy='select')))
session = create_session()
q = (session.query(Item).
join('keywords').
distinct().
filter(Keyword.name == "red"))
eq_(q.count(), 2)
def test_override_1(self):
"""Overriding a column raises an error."""
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
def go():
mapper(User, users,
properties=dict(
name=relationship(mapper(Address, addresses))))
assert_raises(sa.exc.ArgumentError, go)
def test_override_2(self):
"""exclude_properties cancels the error."""
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(User, users,
exclude_properties=['name'],
properties=dict(
name=relationship(mapper(Address, addresses))))
assert bool(User.name)
def test_override_3(self):
"""The column being named elsewhere also cancels the error,"""
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(User, users,
properties=dict(
name=relationship(mapper(Address, addresses)),
foo=users.c.name))
def test_synonym(self):
users, addresses, Address = (self.tables.users,
self.tables.addresses,
self.classes.Address)
assert_col = []
class extendedproperty(property):
attribute = 123
class User(object):
def _get_name(self):
assert_col.append(('get', self.name))
return self.name
def _set_name(self, name):
assert_col.append(('set', name))
self.name = name
uname = extendedproperty(_get_name, _set_name)
mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses), lazy='select'),
uname=synonym('name'),
adlist=synonym('addresses'),
adname=synonym('addresses')
))
# ensure the synonym can get at the proxied comparators without
# an explicit compile
User.name == 'ed'
User.adname.any()
assert hasattr(User, 'adlist')
# as of 0.4.2, synonyms always create a property
assert hasattr(User, 'adname')
# test compile
assert not isinstance(User.uname == 'jack', bool)
assert User.uname.property
assert User.adlist.property
sess = create_session()
# test RowTuple names
row = sess.query(User.id, User.uname).first()
assert row.uname == row[1]
u = sess.query(User).filter(User.uname == 'jack').one()
fixture = self.static.user_address_result[0].addresses
eq_(u.adlist, fixture)
addr = sess.query(Address).filter_by(id=fixture[0].id).one()
u = sess.query(User).filter(User.adname.contains(addr)).one()
u2 = sess.query(User).filter(User.adlist.contains(addr)).one()
assert u is u2
assert u not in sess.dirty
u.uname = "some user name"
assert len(assert_col) > 0
eq_(assert_col, [('set', 'some user name')])
eq_(u.uname, "some user name")
eq_(assert_col, [('set', 'some user name'), ('get', 'some user name')])
eq_(u.name, "some user name")
assert u in sess.dirty
eq_(User.uname.attribute, 123)
def test_synonym_of_synonym(self):
users, User = (self.tables.users,
self.classes.User)
mapper(User, users, properties={
'x': synonym('id'),
'y': synonym('x')
})
s = Session()
u = s.query(User).filter(User.y == 8).one()
eq_(u.y, 8)
def test_synonym_of_non_property_raises(self):
from sqlalchemy.ext.associationproxy import association_proxy
class User(object):
pass
users, Address, addresses = (
self.tables.users,
self.classes.Address,
self.tables.addresses)
mapper(User, users, properties={
'y': synonym('x'),
'addresses': relationship(Address)
})
mapper(Address, addresses)
User.x = association_proxy("addresses", "email_address")
assert_raises_message(
sa.exc.InvalidRequestError,
r'synonym\(\) attribute "User.x" only supports ORM mapped '
'attributes, got .*AssociationProxy',
getattr, User.y, "property"
)
def test_synonym_column_location(self):
users, User = self.tables.users, self.classes.User
def go():
mapper(User, users, properties={
'not_name': synonym('_name', map_column=True)})
assert_raises_message(
sa.exc.ArgumentError,
("Can't compile synonym '_name': no column on table "
"'users' named 'not_name'"),
go)
def test_column_synonyms(self):
"""Synonyms which automatically instrument properties,
set up aliased column, etc."""
addresses, users, Address = (self.tables.addresses,
self.tables.users,
self.classes.Address)
assert_col = []
class User(object):
def _get_name(self):
assert_col.append(('get', self._name))
return self._name
def _set_name(self, name):
assert_col.append(('set', name))
self._name = name
name = property(_get_name, _set_name)
mapper(Address, addresses)
mapper(User, users, properties={
'addresses': relationship(Address, lazy='select'),
'name': synonym('_name', map_column=True)
})
# test compile
assert not isinstance(User.name == 'jack', bool)
assert hasattr(User, 'name')
assert hasattr(User, '_name')
sess = create_session()
u = sess.query(User).filter(User.name == 'jack').one()
eq_(u.name, 'jack')
u.name = 'foo'
eq_(u.name, 'foo')
eq_(assert_col, [('get', 'jack'), ('set', 'foo'), ('get', 'foo')])
def test_synonym_map_column_conflict(self):
users, User = self.tables.users, self.classes.User
assert_raises(
sa.exc.ArgumentError,
mapper,
User, users, properties=util.OrderedDict([
('_user_id', users.c.id),
('id', synonym('_user_id', map_column=True)),
])
)
assert_raises(
sa.exc.ArgumentError,
mapper,
User, users, properties=util.OrderedDict([
('id', synonym('_user_id', map_column=True)),
('_user_id', users.c.id),
])
)
def test_comparable(self):
users = self.tables.users
class extendedproperty(property):
attribute = 123
def method1(self):
return "method1"
from sqlalchemy.orm.properties import ColumnProperty
class UCComparator(ColumnProperty.Comparator):
__hash__ = None
def method1(self):
return "uccmethod1"
def method2(self, other):
return "method2"
def __eq__(self, other):
cls = self.prop.parent.class_
col = getattr(cls, 'name')
if other is None:
return col is None
else:
return sa.func.upper(col) == sa.func.upper(other)
def map_(with_explicit_property):
class User(object):
@extendedproperty
def uc_name(self):
if self.name is None:
return None
return self.name.upper()
if with_explicit_property:
args = (UCComparator, User.uc_name)
else:
args = (UCComparator,)
mapper(User, users, properties=dict(
uc_name=sa.orm.comparable_property(*args)))
return User
for User in (map_(True), map_(False)):
sess = create_session()
sess.begin()
q = sess.query(User)
assert hasattr(User, 'name')
assert hasattr(User, 'uc_name')
eq_(User.uc_name.method1(), "method1")
eq_(User.uc_name.method2('x'), "method2")
assert_raises_message(
AttributeError,
"Neither 'extendedproperty' object nor 'UCComparator' "
"object associated with User.uc_name has an attribute "
"'nonexistent'",
getattr, User.uc_name, 'nonexistent')
# test compile
assert not isinstance(User.uc_name == 'jack', bool)
u = q.filter(User.uc_name == 'JACK').one()
assert u.uc_name == "JACK"
assert u not in sess.dirty
u.name = "some user name"
eq_(u.name, "some user name")
assert u in sess.dirty
eq_(u.uc_name, "SOME USER NAME")
sess.flush()
sess.expunge_all()
q = sess.query(User)
u2 = q.filter(User.name == 'some user name').one()
u3 = q.filter(User.uc_name == 'SOME USER NAME').one()
assert u2 is u3
eq_(User.uc_name.attribute, 123)
sess.rollback()
def test_comparable_column(self):
users, User = self.tables.users, self.classes.User
class MyComparator(sa.orm.properties.ColumnProperty.Comparator):
__hash__ = None
def __eq__(self, other):
# lower case comparison
return func.lower(self.__clause_element__()
) == func.lower(other)
def intersects(self, other):
# non-standard comparator
return self.__clause_element__().op('&=')(other)
mapper(User, users, properties={
'name': sa.orm.column_property(users.c.name,
comparator_factory=MyComparator)
})
assert_raises_message(
AttributeError,
"Neither 'InstrumentedAttribute' object nor "
"'MyComparator' object associated with User.name has "
"an attribute 'nonexistent'",
getattr, User.name, "nonexistent")
eq_(
str((User.name == 'ed').compile(
dialect=sa.engine.default.DefaultDialect())),
"lower(users.name) = lower(:lower_1)")
eq_(
str((User.name.intersects('ed')).compile(
dialect=sa.engine.default.DefaultDialect())),
"users.name &= :name_1")
def test_reentrant_compile(self):
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
class MyFakeProperty(sa.orm.properties.ColumnProperty):
def post_instrument_class(self, mapper):
super(MyFakeProperty, self).post_instrument_class(mapper)
configure_mappers()
m1 = mapper(User, users, properties={
'name': MyFakeProperty(users.c.name)
})
m2 = mapper(Address, addresses)
configure_mappers()
sa.orm.clear_mappers()
class MyFakeProperty(sa.orm.properties.ColumnProperty):
def post_instrument_class(self, mapper):
super(MyFakeProperty, self).post_instrument_class(mapper)
configure_mappers()
m1 = mapper(User, users, properties={
'name': MyFakeProperty(users.c.name)
})
m2 = mapper(Address, addresses)
configure_mappers()
def test_reconstructor(self):
users = self.tables.users
recon = []
class User(object):
@reconstructor
def reconstruct(self):
recon.append('go')
mapper(User, users)
User()
eq_(recon, [])
create_session().query(User).first()
eq_(recon, ['go'])
def test_reconstructor_inheritance(self):
users = self.tables.users
recon = []
class A(object):
@reconstructor
def reconstruct(self):
assert isinstance(self, A)
recon.append('A')
class B(A):
@reconstructor
def reconstruct(self):
assert isinstance(self, B)
recon.append('B')
class C(A):
@reconstructor
def reconstruct(self):
assert isinstance(self, C)
recon.append('C')
mapper(A, users, polymorphic_on=users.c.name,
polymorphic_identity='jack')
mapper(B, inherits=A, polymorphic_identity='ed')
mapper(C, inherits=A, polymorphic_identity='chuck')
A()
B()
C()
eq_(recon, [])
sess = create_session()
sess.query(A).first()
sess.query(B).first()
sess.query(C).first()
eq_(recon, ['A', 'B', 'C'])
def test_unmapped_reconstructor_inheritance(self):
users = self.tables.users
recon = []
class Base(object):
@reconstructor
def reconstruct(self):
recon.append('go')
class User(Base):
pass
mapper(User, users)
User()
eq_(recon, [])
create_session().query(User).first()
eq_(recon, ['go'])
def test_unmapped_error(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(Address, addresses)
sa.orm.clear_mappers()
mapper(User, users, properties={
'addresses': relationship(Address)
})
assert_raises_message(
sa.orm.exc.UnmappedClassError,
"Class 'test.orm._fixtures.Address' is not mapped",
sa.orm.configure_mappers)
def test_unmapped_not_type_error(self):
assert_raises_message(
sa.exc.ArgumentError,
"Class object expected, got '5'.",
class_mapper, 5
)
def test_unmapped_not_type_error_iter_ok(self):
assert_raises_message(
sa.exc.ArgumentError,
r"Class object expected, got '\(5, 6\)'.",
class_mapper, (5, 6)
)
def test_attribute_error_raised_class_mapper(self):
users = self.tables.users
addresses = self.tables.addresses
User = self.classes.User
Address = self.classes.Address
mapper(User, users, properties={
"addresses": relationship(
Address,
primaryjoin=lambda: users.c.id == addresses.wrong.user_id)
})
mapper(Address, addresses)
assert_raises_message(
AttributeError,
"'Table' object has no attribute 'wrong'",
class_mapper, Address
)
def test_key_error_raised_class_mapper(self):
users = self.tables.users
addresses = self.tables.addresses
User = self.classes.User
Address = self.classes.Address
mapper(User, users, properties={
"addresses": relationship(Address,
primaryjoin=lambda: users.c.id ==
addresses.__dict__['wrong'].user_id)
})
mapper(Address, addresses)
assert_raises_message(
KeyError,
"wrong",
class_mapper, Address
)
def test_unmapped_subclass_error_postmap(self):
users = self.tables.users
class Base(object):
pass
class Sub(Base):
pass
mapper(Base, users)
sa.orm.configure_mappers()
# we can create new instances, set attributes.
s = Sub()
s.name = 'foo'
eq_(s.name, 'foo')
eq_(
attributes.get_history(s, 'name'),
(['foo'], (), ())
)
# using it with an ORM operation, raises
assert_raises(sa.orm.exc.UnmappedClassError,
create_session().add, Sub())
def test_unmapped_subclass_error_premap(self):
users = self.tables.users
class Base(object):
pass
mapper(Base, users)
class Sub(Base):
pass
sa.orm.configure_mappers()
# we can create new instances, set attributes.
s = Sub()
s.name = 'foo'
eq_(s.name, 'foo')
eq_(
attributes.get_history(s, 'name'),
(['foo'], (), ())
)
# using it with an ORM operation, raises
assert_raises(sa.orm.exc.UnmappedClassError,
create_session().add, Sub())
def test_oldstyle_mixin(self):
users = self.tables.users
class OldStyle:
pass
class NewStyle(object):
pass
class A(NewStyle, OldStyle):
pass
mapper(A, users)
class B(OldStyle, NewStyle):
pass
mapper(B, users)
class DocumentTest(fixtures.TestBase):
def test_doc_propagate(self):
metadata = MetaData()
t1 = Table('t1', metadata,
Column('col1', Integer, primary_key=True,
doc="primary key column"),
Column('col2', String, doc="data col"),
Column('col3', String, doc="data col 2"),
Column('col4', String, doc="data col 3"),
Column('col5', String),
)
t2 = Table('t2', metadata,
Column('col1', Integer, primary_key=True,
doc="primary key column"),
Column('col2', String, doc="data col"),
Column('col3', Integer, ForeignKey('t1.col1'),
doc="foreign key to t1.col1")
)
class Foo(object):
pass
class Bar(object):
pass
mapper(Foo, t1, properties={
'bars': relationship(Bar,
doc="bar relationship",
backref=backref('foo', doc='foo relationship')
),
'foober': column_property(t1.c.col3, doc='alternate data col'),
'hoho': synonym("col4", doc="syn of col4")
})
mapper(Bar, t2)
configure_mappers()
eq_(Foo.col1.__doc__, "primary key column")
eq_(Foo.col2.__doc__, "data col")
eq_(Foo.col5.__doc__, None)
eq_(Foo.foober.__doc__, "alternate data col")
eq_(Foo.bars.__doc__, "bar relationship")
eq_(Foo.hoho.__doc__, "syn of col4")
eq_(Bar.col1.__doc__, "primary key column")
eq_(Bar.foo.__doc__, "foo relationship")
class ORMLoggingTest(_fixtures.FixtureTest):
def setup(self):
self.buf = logging.handlers.BufferingHandler(100)
for log in [
logging.getLogger('sqlalchemy.orm'),
]:
log.addHandler(self.buf)
def teardown(self):
for log in [
logging.getLogger('sqlalchemy.orm'),
]:
log.removeHandler(self.buf)
def _current_messages(self):
return [b.getMessage() for b in self.buf.buffer]
def test_mapper_info_aliased(self):
User, users = self.classes.User, self.tables.users
tb = users.select().alias()
mapper(User, tb)
s = Session()
s.add(User(name='ed'))
s.commit()
for msg in self._current_messages():
assert msg.startswith('(User|%%(%d anon)s) ' % id(tb))
class OptionsTest(_fixtures.FixtureTest):
def test_synonym_options(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses), lazy='select',
order_by=addresses.c.id),
adlist=synonym('addresses')))
def go():
sess = create_session()
u = (sess.query(User).
order_by(User.id).
options(sa.orm.joinedload('adlist')).
filter_by(name='jack')).one()
eq_(u.adlist,
[self.static.user_address_result[0].addresses[0]])
self.assert_sql_count(testing.db, go, 1)
def test_eager_options(self):
"""A lazy relationship can be upgraded to an eager relationship."""
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses),
order_by=addresses.c.id)))
sess = create_session()
result = (sess.query(User).
order_by(User.id).
options(sa.orm.joinedload('addresses'))).all()
def go():
eq_(result, self.static.user_address_result)
self.sql_count_(0, go)
def test_eager_options_with_limit(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses), lazy='select')))
sess = create_session()
u = (sess.query(User).
options(sa.orm.joinedload('addresses')).
filter_by(id=8)).one()
def go():
eq_(u.id, 8)
eq_(len(u.addresses), 3)
self.sql_count_(0, go)
sess.expunge_all()
u = sess.query(User).filter_by(id=8).one()
eq_(u.id, 8)
eq_(len(u.addresses), 3)
def test_lazy_options_with_limit(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses), lazy='joined')))
sess = create_session()
u = (sess.query(User).
options(sa.orm.lazyload('addresses')).
filter_by(id=8)).one()
def go():
eq_(u.id, 8)
eq_(len(u.addresses), 3)
self.sql_count_(1, go)
def test_eager_degrade(self):
"""An eager relationship automatically degrades to a lazy relationship
if eager columns are not available"""
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses),
lazy='joined', order_by=addresses.c.id)))
sess = create_session()
# first test straight eager load, 1 statement
def go():
result = sess.query(User).order_by(User.id).all()
eq_(result, self.static.user_address_result)
self.sql_count_(1, go)
sess.expunge_all()
# then select just from users. run it into instances.
# then assert the data, which will launch 3 more lazy loads
# (previous users in session fell out of scope and were removed from
# session's identity map)
r = users.select().order_by(users.c.id).execute()
def go():
result = list(sess.query(User).instances(r))
eq_(result, self.static.user_address_result)
self.sql_count_(4, go)
def test_eager_degrade_deep(self):
users, Keyword, items, order_items, orders, \
Item, User, Address, keywords, item_keywords, Order, addresses = (
self.tables.users,
self.classes.Keyword,
self.tables.items,
self.tables.order_items,
self.tables.orders,
self.classes.Item,
self.classes.User,
self.classes.Address,
self.tables.keywords,
self.tables.item_keywords,
self.classes.Order,
self.tables.addresses)
# test with a deeper set of eager loads. when we first load the three
# users, they will have no addresses or orders. the number of lazy
# loads when traversing the whole thing will be three for the
# addresses and three for the orders.
mapper(Address, addresses)
mapper(Keyword, keywords)
mapper(Item, items, properties=dict(
keywords=relationship(Keyword, secondary=item_keywords,
lazy='joined',
order_by=item_keywords.c.keyword_id)))
mapper(Order, orders, properties=dict(
items=relationship(Item, secondary=order_items, lazy='joined',
order_by=order_items.c.item_id)))
mapper(User, users, properties=dict(
addresses=relationship(Address, lazy='joined',
order_by=addresses.c.id),
orders=relationship(Order, lazy='joined',
order_by=orders.c.id)))
sess = create_session()
# first test straight eager load, 1 statement
def go():
result = sess.query(User).order_by(User.id).all()
eq_(result, self.static.user_all_result)
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
# then select just from users. run it into instances.
# then assert the data, which will launch 6 more lazy loads
r = users.select().execute()
def go():
result = list(sess.query(User).instances(r))
eq_(result, self.static.user_all_result)
self.assert_sql_count(testing.db, go, 6)
def test_lazy_options(self):
"""An eager relationship can be upgraded to a lazy relationship."""
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses), lazy='joined')
))
sess = create_session()
result = (sess.query(User).
order_by(User.id).
options(sa.orm.lazyload('addresses'))).all()
def go():
eq_(result, self.static.user_address_result)
self.sql_count_(4, go)
def test_option_propagate(self):
users, items, order_items, Order, Item, User, orders = (
self.tables.users,
self.tables.items,
self.tables.order_items,
self.classes.Order,
self.classes.Item,
self.classes.User,
self.tables.orders)
mapper(User, users, properties=dict(
orders=relationship(Order)
))
mapper(Order, orders, properties=dict(
items=relationship(Item, secondary=order_items)
))
mapper(Item, items)
sess = create_session()
oalias = aliased(Order)
opt1 = sa.orm.joinedload(User.orders, Order.items)
opt2 = sa.orm.contains_eager(User.orders, Order.items, alias=oalias)
u1 = sess.query(User).join(oalias, User.orders).\
options(opt1, opt2).first()
ustate = attributes.instance_state(u1)
assert opt1 in ustate.load_options
assert opt2 not in ustate.load_options
class DeepOptionsTest(_fixtures.FixtureTest):
@classmethod
def setup_mappers(cls):
users, Keyword, items, order_items, Order, Item, User, \
keywords, item_keywords, orders = (
cls.tables.users,
cls.classes.Keyword,
cls.tables.items,
cls.tables.order_items,
cls.classes.Order,
cls.classes.Item,
cls.classes.User,
cls.tables.keywords,
cls.tables.item_keywords,
cls.tables.orders)
mapper(Keyword, keywords)
mapper(Item, items, properties=dict(
keywords=relationship(Keyword, item_keywords,
order_by=item_keywords.c.item_id)))
mapper(Order, orders, properties=dict(
items=relationship(Item, order_items,
order_by=items.c.id)))
mapper(User, users, properties=dict(
orders=relationship(Order, order_by=orders.c.id)))
def test_deep_options_1(self):
User = self.classes.User
sess = create_session()
# joinedload nothing.
u = sess.query(User).order_by(User.id).all()
def go():
u[0].orders[1].items[0].keywords[1]
self.assert_sql_count(testing.db, go, 3)
def test_deep_options_2(self):
"""test (joined|subquery)load_all() options"""
User = self.classes.User
sess = create_session()
result = (sess.query(User).
order_by(User.id).
options(
sa.orm.joinedload_all('orders.items.keywords'))).all()
def go():
result[0].orders[1].items[0].keywords[1]
self.sql_count_(0, go)
sess = create_session()
result = (sess.query(User).
options(
sa.orm.subqueryload_all('orders.items.keywords'))).all()
def go():
result[0].orders[1].items[0].keywords[1]
self.sql_count_(0, go)
def test_deep_options_3(self):
User = self.classes.User
sess = create_session()
# same thing, with separate options calls
q2 = (sess.query(User).
order_by(User.id).
options(sa.orm.joinedload('orders')).
options(sa.orm.joinedload('orders.items')).
options(sa.orm.joinedload('orders.items.keywords')))
u = q2.all()
def go():
u[0].orders[1].items[0].keywords[1]
self.sql_count_(0, go)
def test_deep_options_4(self):
Item, User, Order = (self.classes.Item,
self.classes.User,
self.classes.Order)
sess = create_session()
assert_raises_message(
sa.exc.ArgumentError,
"Can't find property 'items' on any entity "
"specified in this Query.",
sess.query(User).options, sa.orm.joinedload(Order.items))
# joinedload "keywords" on items. it will lazy load "orders", then
# lazy load the "items" on the order, but on "items" it will eager
# load the "keywords"
q3 = sess.query(User).order_by(User.id).options(
sa.orm.joinedload('orders.items.keywords'))
u = q3.all()
def go():
u[0].orders[1].items[0].keywords[1]
self.sql_count_(2, go)
sess = create_session()
q3 = sess.query(User).order_by(User.id).options(
sa.orm.joinedload(User.orders, Order.items, Item.keywords))
u = q3.all()
def go():
u[0].orders[1].items[0].keywords[1]
self.sql_count_(2, go)
class ComparatorFactoryTest(_fixtures.FixtureTest, AssertsCompiledSQL):
def test_kwarg_accepted(self):
users, Address = self.tables.users, self.classes.Address
class DummyComposite(object):
def __init__(self, x, y):
pass
from sqlalchemy.orm.interfaces import PropComparator
class MyFactory(PropComparator):
pass
for args in (
(column_property, users.c.name),
(deferred, users.c.name),
(synonym, 'name'),
(composite, DummyComposite, users.c.id, users.c.name),
(relationship, Address),
(backref, 'address'),
(comparable_property, ),
(dynamic_loader, Address)
):
fn = args[0]
args = args[1:]
fn(comparator_factory=MyFactory, *args)
def test_column(self):
User, users = self.classes.User, self.tables.users
from sqlalchemy.orm.properties import ColumnProperty
class MyFactory(ColumnProperty.Comparator):
__hash__ = None
def __eq__(self, other):
return func.foobar(self.__clause_element__()) == \
func.foobar(other)
mapper(
User, users,
properties={
'name': column_property(
users.c.name, comparator_factory=MyFactory)})
self.assert_compile(
User.name == 'ed',
"foobar(users.name) = foobar(:foobar_1)",
dialect=default.DefaultDialect()
)
self.assert_compile(
aliased(User).name == 'ed',
"foobar(users_1.name) = foobar(:foobar_1)",
dialect=default.DefaultDialect())
def test_synonym(self):
users, User = self.tables.users, self.classes.User
from sqlalchemy.orm.properties import ColumnProperty
class MyFactory(ColumnProperty.Comparator):
__hash__ = None
def __eq__(self, other):
return func.foobar(self.__clause_element__()) ==\
func.foobar(other)
mapper(User, users, properties={
'name': synonym('_name', map_column=True,
comparator_factory=MyFactory)
})
self.assert_compile(
User.name == 'ed',
"foobar(users.name) = foobar(:foobar_1)",
dialect=default.DefaultDialect())
self.assert_compile(
aliased(User).name == 'ed',
"foobar(users_1.name) = foobar(:foobar_1)",
dialect=default.DefaultDialect())
def test_relationship(self):
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
from sqlalchemy.orm.properties import RelationshipProperty
# NOTE: this API changed in 0.8, previously __clause_element__()
# gave the parent selecatable, now it gives the
# primaryjoin/secondaryjoin
class MyFactory(RelationshipProperty.Comparator):
__hash__ = None
def __eq__(self, other):
return func.foobar(self._source_selectable().c.user_id) == \
func.foobar(other.id)
class MyFactory2(RelationshipProperty.Comparator):
__hash__ = None
def __eq__(self, other):
return func.foobar(self._source_selectable().c.id) == \
func.foobar(other.user_id)
mapper(User, users)
mapper(Address, addresses, properties={
'user': relationship(
User, comparator_factory=MyFactory,
backref=backref("addresses", comparator_factory=MyFactory2)
)
}
)
# these are kind of nonsensical tests.
self.assert_compile(Address.user == User(id=5),
"foobar(addresses.user_id) = foobar(:foobar_1)",
dialect=default.DefaultDialect())
self.assert_compile(User.addresses == Address(id=5, user_id=7),
"foobar(users.id) = foobar(:foobar_1)",
dialect=default.DefaultDialect())
self.assert_compile(
aliased(Address).user == User(id=5),
"foobar(addresses_1.user_id) = foobar(:foobar_1)",
dialect=default.DefaultDialect())
self.assert_compile(
aliased(User).addresses == Address(id=5, user_id=7),
"foobar(users_1.id) = foobar(:foobar_1)",
dialect=default.DefaultDialect())
class SecondaryOptionsTest(fixtures.MappedTest):
"""test that the contains_eager() option doesn't bleed
into a secondary load."""
run_inserts = 'once'
run_deletes = None
@classmethod
def define_tables(cls, metadata):
Table("base", metadata,
Column('id', Integer, primary_key=True),
Column('type', String(50), nullable=False)
)
Table("child1", metadata,
Column('id', Integer, ForeignKey('base.id'), primary_key=True),
Column(
'child2id', Integer, ForeignKey('child2.id'), nullable=False)
)
Table("child2", metadata,
Column('id', Integer, ForeignKey('base.id'), primary_key=True),
)
Table('related', metadata,
Column('id', Integer, ForeignKey('base.id'), primary_key=True),
)
@classmethod
def setup_mappers(cls):
child1, child2, base, related = (cls.tables.child1,
cls.tables.child2,
cls.tables.base,
cls.tables.related)
class Base(cls.Comparable):
pass
class Child1(Base):
pass
class Child2(Base):
pass
class Related(cls.Comparable):
pass
mapper(Base, base, polymorphic_on=base.c.type, properties={
'related': relationship(Related, uselist=False)
})
mapper(Child1, child1, inherits=Base,
polymorphic_identity='child1',
properties={
'child2': relationship(
Child2,
primaryjoin=child1.c.child2id == base.c.id,
foreign_keys=child1.c.child2id)
})
mapper(Child2, child2, inherits=Base, polymorphic_identity='child2')
mapper(Related, related)
@classmethod
def insert_data(cls):
child1, child2, base, related = (cls.tables.child1,
cls.tables.child2,
cls.tables.base,
cls.tables.related)
base.insert().execute([
{'id': 1, 'type': 'child1'},
{'id': 2, 'type': 'child1'},
{'id': 3, 'type': 'child1'},
{'id': 4, 'type': 'child2'},
{'id': 5, 'type': 'child2'},
{'id': 6, 'type': 'child2'},
])
child2.insert().execute([
{'id': 4},
{'id': 5},
{'id': 6},
])
child1.insert().execute([
{'id': 1, 'child2id': 4},
{'id': 2, 'child2id': 5},
{'id': 3, 'child2id': 6},
])
related.insert().execute([
{'id': 1},
{'id': 2},
{'id': 3},
{'id': 4},
{'id': 5},
{'id': 6},
])
def test_contains_eager(self):
Child1, Related = self.classes.Child1, self.classes.Related
sess = create_session()
child1s = sess.query(Child1).\
join(Child1.related).\
options(sa.orm.contains_eager(Child1.related)).\
order_by(Child1.id)
def go():
eq_(
child1s.all(),
[
Child1(id=1, related=Related(id=1)),
Child1(id=2, related=Related(id=2)),
Child1(id=3, related=Related(id=3))
]
)
self.assert_sql_count(testing.db, go, 1)
c1 = child1s[0]
self.assert_sql_execution(
testing.db,
lambda: c1.child2,
CompiledSQL(
"SELECT child2.id AS child2_id, base.id AS base_id, "
"base.type AS base_type "
"FROM base JOIN child2 ON base.id = child2.id "
"WHERE base.id = :param_1",
{'param_1': 4}
)
)
def test_joinedload_on_other(self):
Child1, Related = self.classes.Child1, self.classes.Related
sess = create_session()
child1s = sess.query(Child1).join(Child1.related).options(
sa.orm.joinedload(Child1.related)).order_by(Child1.id)
def go():
eq_(
child1s.all(),
[Child1(id=1, related=Related(id=1)),
Child1(id=2, related=Related(id=2)),
Child1(id=3, related=Related(id=3))]
)
self.assert_sql_count(testing.db, go, 1)
c1 = child1s[0]
self.assert_sql_execution(
testing.db,
lambda: c1.child2,
CompiledSQL(
"SELECT child2.id AS child2_id, base.id AS base_id, "
"base.type AS base_type "
"FROM base JOIN child2 ON base.id = child2.id "
"WHERE base.id = :param_1",
{'param_1': 4}
)
)
def test_joinedload_on_same(self):
Child1, Child2, Related = (self.classes.Child1,
self.classes.Child2,
self.classes.Related)
sess = create_session()
child1s = sess.query(Child1).join(Child1.related).options(
sa.orm.joinedload(Child1.child2, Child2.related)
).order_by(Child1.id)
def go():
eq_(
child1s.all(),
[Child1(id=1, related=Related(id=1)),
Child1(id=2, related=Related(id=2)),
Child1(id=3, related=Related(id=3))]
)
self.assert_sql_count(testing.db, go, 4)
c1 = child1s[0]
# this *does* joinedload
self.assert_sql_execution(
testing.db,
lambda: c1.child2,
CompiledSQL(
"SELECT child2.id AS child2_id, base.id AS base_id, "
"base.type AS base_type, "
"related_1.id AS related_1_id FROM base JOIN child2 "
"ON base.id = child2.id "
"LEFT OUTER JOIN related AS related_1 "
"ON base.id = related_1.id WHERE base.id = :param_1",
{'param_1': 4}
)
)
class DeferredPopulationTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table("thing", metadata,
Column(
"id", Integer, primary_key=True,
test_needs_autoincrement=True),
Column("name", String(20)))
Table("human", metadata,
Column(
"id", Integer, primary_key=True,
test_needs_autoincrement=True),
Column("thing_id", Integer, ForeignKey("thing.id")),
Column("name", String(20)))
@classmethod
def setup_mappers(cls):
thing, human = cls.tables.thing, cls.tables.human
class Human(cls.Basic):
pass
class Thing(cls.Basic):
pass
mapper(Human, human, properties={"thing": relationship(Thing)})
mapper(Thing, thing, properties={"name": deferred(thing.c.name)})
@classmethod
def insert_data(cls):
thing, human = cls.tables.thing, cls.tables.human
thing.insert().execute([
{"id": 1, "name": "Chair"},
])
human.insert().execute([
{"id": 1, "thing_id": 1, "name": "Clark Kent"},
])
def _test(self, thing):
assert "name" in attributes.instance_state(thing).dict
def test_no_previous_query(self):
Thing = self.classes.Thing
session = create_session()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
def test_query_twice_with_clear(self):
Thing = self.classes.Thing
session = create_session()
result = session.query(Thing).first() # noqa
session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
def test_query_twice_no_clear(self):
Thing = self.classes.Thing
session = create_session()
result = session.query(Thing).first() # noqa
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
def test_joinedload_with_clear(self):
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
human = session.query(Human).options( # noqa
sa.orm.joinedload("thing")).first()
session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
def test_joinedload_no_clear(self):
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
human = session.query(Human).options( # noqa
sa.orm.joinedload("thing")).first()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
def test_join_with_clear(self):
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
result = session.query(Human).add_entity( # noqa
Thing).join("thing").first()
session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
def test_join_no_clear(self):
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
result = session.query(Human).add_entity( # noqa
Thing).join("thing").first()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
class NoLoadTest(_fixtures.FixtureTest):
run_inserts = 'once'
run_deletes = None
def test_o2m_noload(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
m = mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses), lazy='noload')
))
q = create_session().query(m)
result = [None]
def go():
x = q.filter(User.id == 7).all()
x[0].addresses
result[0] = x
self.assert_sql_count(testing.db, go, 1)
self.assert_result(
result[0], User,
{'id': 7, 'addresses': (Address, [])},
)
def test_upgrade_o2m_noload_lazyload_option(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
m = mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses), lazy='noload')
))
q = create_session().query(m).options(sa.orm.lazyload('addresses'))
result = [None]
def go():
x = q.filter(User.id == 7).all()
x[0].addresses
result[0] = x
self.sql_count_(2, go)
self.assert_result(
result[0], User,
{'id': 7, 'addresses': (Address, [{'id': 1}])},
)
def test_m2o_noload_option(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(Address, addresses, properties={
'user': relationship(User)
})
mapper(User, users)
s = Session()
a1 = s.query(Address).filter_by(id=1).options(
sa.orm.noload('user')).first()
def go():
eq_(a1.user, None)
self.sql_count_(0, go)
class RaiseLoadTest(_fixtures.FixtureTest):
run_inserts = 'once'
run_deletes = None
def test_o2m_raiseload_mapper(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(Address, addresses)
mapper(User, users, properties=dict(
addresses=relationship(Address, lazy='raise')
))
q = create_session().query(User)
result = [None]
def go():
x = q.filter(User.id == 7).all()
assert_raises_message(
sa.exc.InvalidRequestError,
"'User.addresses' is not available due to lazy='raise'",
lambda: x[0].addresses)
result[0] = x
self.assert_sql_count(testing.db, go, 1)
self.assert_result(
result[0], User,
{'id': 7},
)
def test_o2m_raiseload_option(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(Address, addresses)
mapper(User, users, properties=dict(
addresses=relationship(Address)
))
q = create_session().query(User)
result = [None]
def go():
x = q.options(
sa.orm.raiseload(User.addresses)).filter(User.id == 7).all()
assert_raises_message(
sa.exc.InvalidRequestError,
"'User.addresses' is not available due to lazy='raise'",
lambda: x[0].addresses)
result[0] = x
self.assert_sql_count(testing.db, go, 1)
self.assert_result(
result[0], User,
{'id': 7},
)
def test_o2m_raiseload_lazyload_option(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(Address, addresses)
mapper(User, users, properties=dict(
addresses=relationship(Address, lazy='raise')
))
q = create_session().query(User).options(sa.orm.lazyload('addresses'))
result = [None]
def go():
x = q.filter(User.id == 7).all()
x[0].addresses
result[0] = x
self.sql_count_(2, go)
self.assert_result(
result[0], User,
{'id': 7, 'addresses': (Address, [{'id': 1}])},
)
def test_m2o_raiseload_option(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(Address, addresses, properties={
'user': relationship(User)
})
mapper(User, users)
s = Session()
a1 = s.query(Address).filter_by(id=1).options(
sa.orm.raiseload('user')).first()
def go():
assert_raises_message(
sa.exc.InvalidRequestError,
"'Address.user' is not available due to lazy='raise'",
lambda: a1.user)
self.sql_count_(0, go)
def test_m2o_raise_on_sql_option(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(Address, addresses, properties={
'user': relationship(User)
})
mapper(User, users)
s = Session()
a1 = s.query(Address).filter_by(id=1).options(
sa.orm.raiseload('user', sql_only=True)).first()
def go():
assert_raises_message(
sa.exc.InvalidRequestError,
"'Address.user' is not available due to lazy='raise_on_sql'",
lambda: a1.user)
self.sql_count_(0, go)
s.close()
u1 = s.query(User).first()
a1 = s.query(Address).filter_by(id=1).options(
sa.orm.raiseload('user', sql_only=True)).first()
assert 'user' not in a1.__dict__
is_(a1.user, u1)
def test_m2o_non_use_get_raise_on_sql_option(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
mapper(Address, addresses, properties={
'user': relationship(
User,
primaryjoin=sa.and_(
addresses.c.user_id == users.c.id,
users.c.name != None # noqa
)
)
})
mapper(User, users)
s = Session()
u1 = s.query(User).first()
a1 = s.query(Address).filter_by(id=1).options(
sa.orm.raiseload('user', sql_only=True)).first()
def go():
assert_raises_message(
sa.exc.InvalidRequestError,
"'Address.user' is not available due to lazy='raise_on_sql'",
lambda: a1.user)
class RequirementsTest(fixtures.MappedTest):
"""Tests the contract for user classes."""
@classmethod
def define_tables(cls, metadata):
Table('ht1', metadata,
Column(
'id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('value', String(10)))
Table('ht2', metadata,
Column(
'id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('ht1_id', Integer, ForeignKey('ht1.id')),
Column('value', String(10)))
Table('ht3', metadata,
Column(
'id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('value', String(10)))
Table('ht4', metadata,
Column('ht1_id', Integer, ForeignKey('ht1.id'),
primary_key=True),
Column('ht3_id', Integer, ForeignKey('ht3.id'),
primary_key=True))
Table('ht5', metadata,
Column('ht1_id', Integer, ForeignKey('ht1.id'),
primary_key=True))
Table('ht6', metadata,
Column('ht1a_id', Integer, ForeignKey('ht1.id'),
primary_key=True),
Column('ht1b_id', Integer, ForeignKey('ht1.id'),
primary_key=True),
Column('value', String(10)))
if util.py2k:
def test_baseclass(self):
ht1 = self.tables.ht1
class OldStyle:
pass
assert_raises(sa.exc.ArgumentError, mapper, OldStyle, ht1)
assert_raises(sa.exc.ArgumentError, mapper, 123)
class NoWeakrefSupport(str):
pass
# TODO: is weakref support detectable without an instance?
# self.assertRaises(
# sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2)
class _ValueBase(object):
def __init__(self, value='abc', id=None):
self.id = id
self.value = value
def __bool__(self):
return False
def __hash__(self):
return hash(self.value)
def __eq__(self, other):
if isinstance(other, type(self)):
return self.value == other.value
return False
def test_comparison_overrides(self):
"""Simple tests to ensure users can supply comparison __methods__.
The suite-level test --options are better suited to detect
problems- they add selected __methods__ across the board on all
ORM tests. This test simply shoves a variety of operations
through the ORM to catch basic regressions early in a standard
test run.
"""
ht6, ht5, ht4, ht3, ht2, ht1 = (self.tables.ht6,
self.tables.ht5,
self.tables.ht4,
self.tables.ht3,
self.tables.ht2,
self.tables.ht1)
class H1(self._ValueBase):
pass
class H2(self._ValueBase):
pass
class H3(self._ValueBase):
pass
class H6(self._ValueBase):
pass
mapper(H1, ht1, properties={
'h2s': relationship(H2, backref='h1'),
'h3s': relationship(H3, secondary=ht4, backref='h1s'),
'h1s': relationship(H1, secondary=ht5, backref='parent_h1'),
't6a': relationship(H6, backref='h1a',
primaryjoin=ht1.c.id == ht6.c.ht1a_id),
't6b': relationship(H6, backref='h1b',
primaryjoin=ht1.c.id == ht6.c.ht1b_id),
})
mapper(H2, ht2)
mapper(H3, ht3)
mapper(H6, ht6)
s = create_session()
s.add_all([
H1('abc'),
H1('def'),
])
h1 = H1('ghi')
s.add(h1)
h1.h2s.append(H2('abc'))
h1.h3s.extend([H3(), H3()])
h1.h1s.append(H1())
s.flush()
eq_(select([func.count('*')]).select_from(ht1).scalar(), 4)
h6 = H6()
h6.h1a = h1
h6.h1b = h1
h6 = H6()
h6.h1a = h1
h6.h1b = x = H1()
assert x in s
h6.h1b.h2s.append(H2('def'))
s.flush()
h1.h2s.extend([H2('abc'), H2('def')])
s.flush()
h1s = s.query(H1).options(sa.orm.joinedload('h2s')).all()
eq_(len(h1s), 5)
self.assert_unordered_result(h1s, H1,
{'h2s': []},
{'h2s': []},
{'h2s': (H2, [{'value': 'abc'},
{'value': 'def'},
{'value': 'abc'}])},
{'h2s': []},
{'h2s': (H2, [{'value': 'def'}])})
h1s = s.query(H1).options(sa.orm.joinedload('h3s')).all()
eq_(len(h1s), 5)
h1s = s.query(H1).options(sa.orm.joinedload_all('t6a.h1b'),
sa.orm.joinedload('h2s'),
sa.orm.joinedload_all('h3s.h1s')).all()
eq_(len(h1s), 5)
def test_composite_results(self):
ht2, ht1 = (self.tables.ht2,
self.tables.ht1)
class H1(self._ValueBase):
def __init__(self, value, id, h2s):
self.value = value
self.id = id
self.h2s = h2s
class H2(self._ValueBase):
def __init__(self, value, id):
self.value = value
self.id = id
mapper(H1, ht1, properties={
'h2s': relationship(H2, backref='h1'),
})
mapper(H2, ht2)
s = Session()
s.add_all([
H1('abc', 1, h2s=[
H2('abc', id=1),
H2('def', id=2),
H2('def', id=3),
]),
H1('def', 2, h2s=[
H2('abc', id=4),
H2('abc', id=5),
H2('def', id=6),
]),
])
s.commit()
eq_(
[(h1.value, h1.id, h2.value, h2.id)
for h1, h2 in
s.query(H1, H2).join(H1.h2s).order_by(H1.id, H2.id)],
[
('abc', 1, 'abc', 1),
('abc', 1, 'def', 2),
('abc', 1, 'def', 3),
('def', 2, 'abc', 4),
('def', 2, 'abc', 5),
('def', 2, 'def', 6),
]
)
def test_nonzero_len_recursion(self):
ht1 = self.tables.ht1
class H1(object):
def __len__(self):
return len(self.get_value())
def get_value(self):
self.value = "foobar"
return self.value
class H2(object):
def __bool__(self):
return bool(self.get_value())
def get_value(self):
self.value = "foobar"
return self.value
mapper(H1, ht1)
mapper(H2, ht1)
h1 = H1()
h1.value = "Asdf"
h1.value = "asdf asdf" # ding
h2 = H2()
h2.value = "Asdf"
h2.value = "asdf asdf" # ding
class IsUserlandTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('foo', metadata,
Column('id', Integer, primary_key=True),
Column('someprop', Integer)
)
def _test(self, value, instancelevel=None):
class Foo(object):
someprop = value
m = mapper(Foo, self.tables.foo)
eq_(Foo.someprop, value)
f1 = Foo()
if instancelevel is not None:
eq_(f1.someprop, instancelevel)
else:
eq_(f1.someprop, value)
assert self.tables.foo.c.someprop not in m._columntoproperty
def _test_not(self, value):
class Foo(object):
someprop = value
m = mapper(Foo, self.tables.foo)
is_(Foo.someprop.property.columns[0], self.tables.foo.c.someprop)
assert self.tables.foo.c.someprop in m._columntoproperty
def test_string(self):
self._test("someprop")
def test_unicode(self):
self._test("someprop")
def test_int(self):
self._test(5)
def test_dict(self):
self._test({"bar": "bat"})
def test_set(self):
self._test(set([6]))
def test_column(self):
self._test_not(self.tables.foo.c.someprop)
def test_relationship(self):
self._test_not(relationship("bar"))
def test_descriptor(self):
def somefunc(self):
return "hi"
self._test(property(somefunc), "hi")
class MagicNamesTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('cartographers', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('name', String(50)),
Column('alias', String(50)),
Column('quip', String(100)))
Table('maps', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('cart_id', Integer,
ForeignKey('cartographers.id')),
Column('state', String(2)),
Column('data', sa.Text))
@classmethod
def setup_classes(cls):
class Cartographer(cls.Basic):
pass
class Map(cls.Basic):
pass
def test_mappish(self):
maps, Cartographer, cartographers, Map = (self.tables.maps,
self.classes.Cartographer,
self.tables.cartographers,
self.classes.Map)
mapper(Cartographer, cartographers, properties=dict(
query=cartographers.c.quip))
mapper(Map, maps, properties=dict(
mapper=relationship(Cartographer, backref='maps')))
c = Cartographer(name='Lenny', alias='The Dude',
query='Where be dragons?')
Map(state='AK', mapper=c)
sess = create_session()
sess.add(c)
sess.flush()
sess.expunge_all()
for C, M in ((Cartographer, Map),
(sa.orm.aliased(Cartographer), sa.orm.aliased(Map))):
c1 = (sess.query(C).
filter(C.alias == 'The Dude').
filter(C.query == 'Where be dragons?')).one()
sess.query(M).filter(M.mapper == c1).one()
def test_direct_stateish(self):
for reserved in (sa.orm.instrumentation.ClassManager.STATE_ATTR,
sa.orm.instrumentation.ClassManager.MANAGER_ATTR):
t = Table('t', sa.MetaData(),
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column(reserved, Integer))
class T(object):
pass
assert_raises_message(
KeyError,
('%r: requested attribute name conflicts with '
'instrumentation attribute of the same name.' % reserved),
mapper, T, t)
def test_indirect_stateish(self):
maps = self.tables.maps
for reserved in (sa.orm.instrumentation.ClassManager.STATE_ATTR,
sa.orm.instrumentation.ClassManager.MANAGER_ATTR):
class M(object):
pass
assert_raises_message(
KeyError,
('requested attribute name conflicts with '
'instrumentation attribute of the same name'),
mapper, M, maps, properties={
reserved: maps.c.state})