mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-06 17:01:07 -04:00
839 lines
35 KiB
Python
839 lines
35 KiB
Python
from testbase import PersistTest, AssertMixin
|
|
import testbase
|
|
import unittest, sys, os
|
|
from sqlalchemy import *
|
|
|
|
|
|
from tables import *
|
|
import tables
|
|
|
|
user_result = [{'user_id' : 7}, {'user_id' : 8}, {'user_id' : 9}]
|
|
user_address_result = [
|
|
{'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
|
|
{'user_id' : 8, 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}])},
|
|
{'user_id' : 9, 'addresses' : (Address, [])}
|
|
]
|
|
user_address_orders_result = [{'user_id' : 7,
|
|
'addresses' : (Address, [{'address_id' : 1}]),
|
|
'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},])
|
|
},
|
|
|
|
{'user_id' : 8,
|
|
'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
|
|
'orders' : (Order, [])
|
|
},
|
|
{'user_id' : 9,
|
|
'addresses' : (Address, []),
|
|
'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}])
|
|
}]
|
|
|
|
user_all_result = [
|
|
{'user_id' : 7,
|
|
'addresses' : (Address, [{'address_id' : 1}]),
|
|
'orders' : (Order, [
|
|
{'order_id' : 1, 'items': (Item, [])},
|
|
{'order_id' : 3, 'items': (Item, [{'item_id':3, 'item_name':'item 3'}, {'item_id':4, 'item_name':'item 4'}, {'item_id':5, 'item_name':'item 5'}])},
|
|
{'order_id' : 5, 'items': (Item, [])},
|
|
])
|
|
},
|
|
{'user_id' : 8,
|
|
'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
|
|
'orders' : (Order, [])
|
|
},
|
|
{'user_id' : 9,
|
|
'addresses' : (Address, []),
|
|
'orders' : (Order, [
|
|
{'order_id' : 2, 'items': (Item, [{'item_id':1, 'item_name':'item 1'}, {'item_id':2, 'item_name':'item 2'}])},
|
|
{'order_id' : 4, 'items': (Item, [])}
|
|
])
|
|
}]
|
|
|
|
item_keyword_result = [
|
|
{'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
|
|
{'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2, 'name':'red'}, {'keyword_id' : 5, 'name':'small'}, {'keyword_id' : 7, 'name':'square'}])},
|
|
{'item_id' : 3, 'keywords' : (Keyword, [{'keyword_id' : 3,'name':'green'}, {'keyword_id' : 4,'name':'big'}, {'keyword_id' : 6,'name':'round'}])},
|
|
{'item_id' : 4, 'keywords' : (Keyword, [])},
|
|
{'item_id' : 5, 'keywords' : (Keyword, [])}
|
|
]
|
|
|
|
|
|
class MapperSuperTest(AssertMixin):
|
|
def setUpAll(self):
|
|
db.echo = False
|
|
tables.create()
|
|
tables.data()
|
|
db.echo = testbase.echo
|
|
def tearDownAll(self):
|
|
db.echo = False
|
|
tables.drop()
|
|
db.echo = testbase.echo
|
|
def tearDown(self):
|
|
objectstore.clear()
|
|
clear_mappers()
|
|
def setUp(self):
|
|
pass
|
|
|
|
class MapperTest(MapperSuperTest):
|
|
def testget(self):
|
|
m = mapper(User, users)
|
|
self.assert_(m.get(19) is None)
|
|
u = m.get(7)
|
|
u2 = m.get(7)
|
|
self.assert_(u is u2)
|
|
objectstore.clear()
|
|
u2 = m.get(7)
|
|
self.assert_(u is not u2)
|
|
|
|
def testrefresh(self):
|
|
m = mapper(User, users, properties={'addresses':relation(mapper(Address, addresses))})
|
|
u = m.get(7)
|
|
u.user_name = 'foo'
|
|
a = Address()
|
|
u.addresses.append(a)
|
|
|
|
self.assert_(a in u.addresses)
|
|
|
|
objectstore.refresh(u)
|
|
|
|
# its refreshed, so not dirty
|
|
self.assert_(u not in objectstore.get_session().uow.dirty)
|
|
|
|
# username is back to the DB
|
|
self.assert_(u.user_name == 'jack')
|
|
|
|
self.assert_(a not in u.addresses)
|
|
|
|
u.user_name = 'foo'
|
|
u.addresses.append(a)
|
|
# now its dirty
|
|
self.assert_(u in objectstore.get_session().uow.dirty)
|
|
self.assert_(u.user_name == 'foo')
|
|
self.assert_(a in u.addresses)
|
|
objectstore.expire(u)
|
|
|
|
# get the attribute, it refreshes
|
|
self.assert_(u.user_name == 'jack')
|
|
self.assert_(a not in u.addresses)
|
|
|
|
def testrefresh_lazy(self):
|
|
"""tests that when a lazy loader is set as a trigger on an object's attribute (at the attribute level, not the class level), a refresh() operation doesnt fire the lazy loader or create any problems"""
|
|
m = mapper(User, users, properties={'addresses':relation(mapper(Address, addresses))})
|
|
m2 = m.options(lazyload('addresses'))
|
|
u = m2.selectfirst(users.c.user_id==8)
|
|
def go():
|
|
objectstore.refresh(u)
|
|
self.assert_sql_count(db, go, 1)
|
|
|
|
def testexpire_eager(self):
|
|
"""tests that an eager load will populate expire()'d objects"""
|
|
m = mapper(User, users, properties={'addresses':relation(mapper(Address, addresses))})
|
|
[u1, u2, u3] = m.select(users.c.user_id.in_(7, 8, 9))
|
|
self.echo([repr(x.addresses) for x in [u1, u2, u3]])
|
|
[objectstore.expire(u) for u in [u1, u2, u3]]
|
|
m2 = m.options(eagerload('addresses'))
|
|
l = m2.select(users.c.user_id.in_(7,8,9))
|
|
def go():
|
|
u1.addresses
|
|
u2.addresses
|
|
u3.addresses
|
|
self.assert_sql_count(db, go, 0)
|
|
|
|
def testsessionpropigation(self):
|
|
sess = objectstore.Session()
|
|
m = mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), lazy=True)})
|
|
u = m.using(sess).get(7)
|
|
assert objectstore.get_session(u) is sess
|
|
assert objectstore.get_session(u.addresses[0]) is sess
|
|
|
|
def testexpire(self):
|
|
m = mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), lazy=False)})
|
|
u = m.get(7)
|
|
assert(len(u.addresses) == 1)
|
|
u.user_name = 'foo'
|
|
del u.addresses[0]
|
|
objectstore.expire(u)
|
|
# test plain expire
|
|
self.assert_(u.user_name =='jack')
|
|
self.assert_(len(u.addresses) == 1)
|
|
|
|
# we're changing the database here, so if this test fails in the middle,
|
|
# it'll screw up the other tests which are hardcoded to 7/'jack'
|
|
u.user_name = 'foo'
|
|
objectstore.commit()
|
|
# change the value in the DB
|
|
users.update(users.c.user_id==7, values=dict(user_name='jack')).execute()
|
|
objectstore.expire(u)
|
|
# object isnt refreshed yet, using dict to bypass trigger
|
|
self.assert_(u.__dict__['user_name'] != 'jack')
|
|
# do a select
|
|
m.select()
|
|
# test that it refreshed
|
|
self.assert_(u.__dict__['user_name'] == 'jack')
|
|
|
|
# object should be back to normal now,
|
|
# this should *not* produce a SELECT statement (not tested here though....)
|
|
self.assert_(u.user_name =='jack')
|
|
|
|
def testrefresh2(self):
|
|
assign_mapper(Address, addresses)
|
|
|
|
assign_mapper(User, users, properties = dict(addresses=relation(Address.mapper,private=True,lazy=False)) )
|
|
|
|
u=User()
|
|
u.user_name='Justin'
|
|
a = Address()
|
|
a.address_id=17 # to work around the hardcoded IDs in this test suite....
|
|
u.addresses.append(a)
|
|
objectstore.commit()
|
|
objectstore.clear()
|
|
u = User.mapper.selectfirst()
|
|
print u.user_name
|
|
|
|
#ok so far
|
|
u.expire() #hangs when
|
|
print u.user_name #this line runs
|
|
|
|
u.refresh() #hangs
|
|
|
|
def testmagic(self):
|
|
m = mapper(User, users, properties = {
|
|
'addresses' : relation(mapper(Address, addresses))
|
|
})
|
|
l = m.select_by(user_name='fred')
|
|
self.assert_result(l, User, *[{'user_id':9}])
|
|
u = l[0]
|
|
|
|
u2 = m.get_by_user_name('fred')
|
|
self.assert_(u is u2)
|
|
|
|
l = m.select_by(email_address='ed@bettyboop.com')
|
|
self.assert_result(l, User, *[{'user_id':8}])
|
|
|
|
l = m.select_by(User.c.user_name=='fred', addresses.c.email_address!='ed@bettyboop.com', user_id=9)
|
|
|
|
def testprops(self):
|
|
"""tests the various attributes of the properties attached to classes"""
|
|
m = mapper(User, users, properties = {
|
|
'addresses' : relation(mapper(Address, addresses))
|
|
})
|
|
self.assert_(User.addresses.property is m.props['addresses'])
|
|
|
|
def testload(self):
|
|
"""tests loading rows with a mapper and producing object instances"""
|
|
m = mapper(User, users)
|
|
l = m.select()
|
|
self.assert_result(l, User, *user_result)
|
|
l = m.select(users.c.user_name.endswith('ed'))
|
|
self.assert_result(l, User, *user_result[1:3])
|
|
|
|
def testorderby(self):
|
|
# TODO: make a unit test out of these various combinations
|
|
# m = mapper(User, users, order_by=desc(users.c.user_name))
|
|
m = mapper(User, users, order_by=None)
|
|
# m = mapper(User, users)
|
|
|
|
# l = m.select(order_by=[desc(users.c.user_name), asc(users.c.user_id)])
|
|
l = m.select()
|
|
# l = m.select(order_by=[])
|
|
# l = m.select(order_by=None)
|
|
|
|
|
|
def testfunction(self):
|
|
"""tests mapping to a SELECT statement that has functions in it."""
|
|
s = select([users, (users.c.user_id * 2).label('concat'), func.count(addresses.c.address_id).label('count')],
|
|
users.c.user_id==addresses.c.user_id, group_by=[c for c in users.c]).alias('myselect')
|
|
m = mapper(User, s, primarytable=users)
|
|
print [c.key for c in m.c]
|
|
l = m.select()
|
|
for u in l:
|
|
print "User", u.user_id, u.user_name, u.concat, u.count
|
|
#l[1].user_name='asdf'
|
|
#objectstore.commit()
|
|
|
|
def testcount(self):
|
|
m = mapper(User, users)
|
|
self.assert_(m.count()==3)
|
|
self.assert_(m.count(users.c.user_id.in_(8,9))==2)
|
|
self.assert_(m.count_by(user_name='fred')==1)
|
|
|
|
def testmultitable(self):
|
|
usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id)
|
|
m = mapper(User, usersaddresses, primarytable = users, primary_key=[users.c.user_id])
|
|
l = m.select()
|
|
self.assert_result(l, User, *user_result[0:2])
|
|
|
|
def testoverride(self):
|
|
# assert that overriding a column raises an error
|
|
try:
|
|
m = mapper(User, users, properties = {
|
|
'user_name' : relation(mapper(Address, addresses)),
|
|
})
|
|
self.assert_(False, "should have raised ArgumentError")
|
|
except ArgumentError, e:
|
|
self.assert_(True)
|
|
|
|
# assert that allow_column_override cancels the error
|
|
m = mapper(User, users, properties = {
|
|
'user_name' : relation(mapper(Address, addresses))
|
|
}, allow_column_override=True)
|
|
|
|
# assert that the column being named else where also cancels the error
|
|
m = mapper(User, users, properties = {
|
|
'user_name' : relation(mapper(Address, addresses)),
|
|
'foo' : users.c.user_name,
|
|
})
|
|
|
|
def testeageroptions(self):
|
|
"""tests that a lazy relation can be upgraded to an eager relation via the options method"""
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), lazy = True)
|
|
))
|
|
l = m.options(eagerload('addresses')).select()
|
|
|
|
def go():
|
|
self.assert_result(l, User, *user_address_result)
|
|
self.assert_sql_count(db, go, 0)
|
|
|
|
def testlazyoptions(self):
|
|
"""tests that an eager relation can be upgraded to a lazy relation via the options method"""
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), lazy = False)
|
|
))
|
|
l = m.options(lazyload('addresses')).select()
|
|
def go():
|
|
self.assert_result(l, User, *user_address_result)
|
|
self.assert_sql_count(db, go, 3)
|
|
|
|
def testdeepoptions(self):
|
|
m = mapper(User, users,
|
|
properties = {
|
|
'orders': relation(mapper(Order, orders, properties = {
|
|
'items' : relation(mapper(Item, orderitems, properties = {
|
|
'keywords' : relation(mapper(Keyword, keywords), itemkeywords)
|
|
}))
|
|
}))
|
|
})
|
|
|
|
m2 = m.options(eagerload('orders.items.keywords'))
|
|
u = m.select()
|
|
def go():
|
|
print u[0].orders[1].items[0].keywords[1]
|
|
self.assert_sql_count(db, go, 3)
|
|
objectstore.clear()
|
|
u = m2.select()
|
|
self.assert_sql_count(db, go, 2)
|
|
|
|
class PropertyTest(MapperSuperTest):
|
|
def testbasic(self):
|
|
"""tests that you can create mappers inline with class definitions"""
|
|
class _Address(object):
|
|
pass
|
|
assign_mapper(_Address, addresses)
|
|
|
|
class _User(object):
|
|
pass
|
|
assign_mapper(_User, users, properties = dict(
|
|
addresses = relation(_Address.mapper, lazy = False)
|
|
), is_primary = True)
|
|
|
|
l = _User.mapper.select(_User.c.user_name == 'fred')
|
|
self.echo(repr(l))
|
|
|
|
|
|
def testinherits(self):
|
|
class _Order(object):
|
|
pass
|
|
assign_mapper(_Order, orders)
|
|
|
|
class _User(object):
|
|
pass
|
|
assign_mapper(_User, users, properties = dict(
|
|
orders = relation(_Order.mapper, lazy = False)
|
|
))
|
|
|
|
class AddressUser(_User):
|
|
pass
|
|
assign_mapper(AddressUser, addresses, inherits = _User.mapper)
|
|
|
|
l = AddressUser.mapper.select()
|
|
|
|
jack = l[0]
|
|
self.assert_(jack.user_name=='jack')
|
|
jack.email_address = 'jack@gmail.com'
|
|
objectstore.commit()
|
|
objectstore.clear()
|
|
au = AddressUser.mapper.get_by(user_name='jack')
|
|
self.assert_(au.email_address == 'jack@gmail.com')
|
|
|
|
def testinherits2(self):
|
|
class _Order(object):
|
|
pass
|
|
class _Address(object):
|
|
pass
|
|
class AddressUser(_Address):
|
|
pass
|
|
assign_mapper(_Order, orders)
|
|
assign_mapper(_Address, addresses)
|
|
assign_mapper(AddressUser, users, inherits = _Address.mapper,
|
|
properties = {
|
|
'orders' : relation(_Order.mapper, lazy=False)
|
|
})
|
|
l = AddressUser.mapper.select()
|
|
jack = l[0]
|
|
self.assert_(jack.user_name=='jack')
|
|
jack.email_address = 'jack@gmail.com'
|
|
objectstore.commit()
|
|
objectstore.clear()
|
|
au = AddressUser.mapper.get_by(user_name='jack')
|
|
self.assert_(au.email_address == 'jack@gmail.com')
|
|
|
|
|
|
class DeferredTest(MapperSuperTest):
|
|
|
|
def testbasic(self):
|
|
"""tests a basic "deferred" load"""
|
|
|
|
m = mapper(Order, orders, properties={
|
|
'description':deferred(orders.c.description)
|
|
})
|
|
|
|
o = Order()
|
|
self.assert_(o.description is None)
|
|
|
|
def go():
|
|
l = m.select()
|
|
o2 = l[2]
|
|
print o2.description
|
|
|
|
self.assert_sql(db, go, [
|
|
("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.isopen AS orders_isopen FROM orders ORDER BY orders.%s" % orders.default_order_by()[0].key, {}),
|
|
("SELECT orders.description AS orders_description FROM orders WHERE orders.order_id = :orders_order_id", {'orders_order_id':3})
|
|
])
|
|
|
|
def testsave(self):
|
|
m = mapper(Order, orders, properties={
|
|
'description':deferred(orders.c.description)
|
|
})
|
|
|
|
l = m.select()
|
|
o2 = l[2]
|
|
o2.isopen = 1
|
|
objectstore.commit()
|
|
|
|
def testgroup(self):
|
|
"""tests deferred load with a group"""
|
|
|
|
m = mapper(Order, orders, properties = {
|
|
'userident':deferred(orders.c.user_id, group='primary'),
|
|
'description':deferred(orders.c.description, group='primary'),
|
|
'opened':deferred(orders.c.isopen, group='primary')
|
|
})
|
|
|
|
def go():
|
|
l = m.select()
|
|
o2 = l[2]
|
|
print o2.opened, o2.description, o2.userident
|
|
self.assert_sql(db, go, [
|
|
("SELECT orders.order_id AS orders_order_id FROM orders ORDER BY orders.%s" % orders.default_order_by()[0].key, {}),
|
|
("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders WHERE orders.order_id = :orders_order_id", {'orders_order_id':3})
|
|
])
|
|
|
|
def testoptions(self):
|
|
"""tests using options on a mapper to create deferred and undeferred columns"""
|
|
m = mapper(Order, orders)
|
|
m2 = m.options(defer('user_id'))
|
|
def go():
|
|
l = m2.select()
|
|
print l[2].user_id
|
|
self.assert_sql(db, go, [
|
|
("SELECT orders.order_id AS orders_order_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY orders.%s" % orders.default_order_by()[0].key, {}),
|
|
("SELECT orders.user_id AS orders_user_id FROM orders WHERE orders.order_id = :orders_order_id", {'orders_order_id':3})
|
|
])
|
|
objectstore.clear()
|
|
m3 = m2.options(undefer('user_id'))
|
|
def go():
|
|
l = m3.select()
|
|
print l[3].user_id
|
|
self.assert_sql(db, go, [
|
|
("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY orders.%s" % orders.default_order_by()[0].key, {}),
|
|
])
|
|
|
|
def testdeepoptions(self):
|
|
m = mapper(User, users, properties={
|
|
'orders':relation(mapper(Order, orders, properties={
|
|
'items':relation(mapper(Item, orderitems, properties={
|
|
'item_name':deferred(orderitems.c.item_name)
|
|
}))
|
|
}))
|
|
})
|
|
l = m.select()
|
|
item = l[0].orders[1].items[1]
|
|
def go():
|
|
print item.item_name
|
|
self.assert_sql_count(db, go, 1)
|
|
self.assert_(item.item_name == 'item 4')
|
|
objectstore.clear()
|
|
m2 = m.options(undefer('orders.items.item_name'))
|
|
l = m2.select()
|
|
item = l[0].orders[1].items[1]
|
|
def go():
|
|
print item.item_name
|
|
self.assert_sql_count(db, go, 0)
|
|
self.assert_(item.item_name == 'item 4')
|
|
|
|
|
|
class LazyTest(MapperSuperTest):
|
|
|
|
def testbasic(self):
|
|
"""tests a basic one-to-many lazy load"""
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), lazy = True)
|
|
))
|
|
l = m.select(users.c.user_id == 7)
|
|
self.assert_result(l, User,
|
|
{'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
|
|
)
|
|
|
|
def testorderby(self):
|
|
m = mapper(Address, addresses)
|
|
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(m, lazy = True, order_by=addresses.c.email_address),
|
|
))
|
|
l = m.select()
|
|
|
|
self.assert_result(l, User,
|
|
{'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
|
|
{'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])},
|
|
{'user_id' : 9, 'addresses' : (Address, [])}
|
|
)
|
|
|
|
def testorderby_desc(self):
|
|
m = mapper(Address, addresses)
|
|
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(m, lazy = True, order_by=[desc(addresses.c.email_address)]),
|
|
))
|
|
l = m.select()
|
|
|
|
self.assert_result(l, User,
|
|
{'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
|
|
{'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}])},
|
|
{'user_id' : 9, 'addresses' : (Address, [])},
|
|
)
|
|
|
|
def testlimit(self):
|
|
ordermapper = mapper(Order, orders, properties = dict(
|
|
items = relation(mapper(Item, orderitems), lazy = True)
|
|
))
|
|
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), lazy = True),
|
|
orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = True),
|
|
))
|
|
l = m.select(limit=2, offset=1)
|
|
self.assert_result(l, User, *user_all_result[1:3])
|
|
# use a union all to get a lot of rows to join against
|
|
u2 = users.alias('u2')
|
|
s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
|
|
print [key for key in s.c.keys()]
|
|
l = m.select(s.c.u2_user_id==User.c.user_id, distinct=True)
|
|
self.assert_result(l, User, *user_all_result)
|
|
|
|
objectstore.clear()
|
|
m = mapper(Item, orderitems, is_primary=True, properties = dict(
|
|
keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True),
|
|
))
|
|
l = m.select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2)
|
|
self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]])
|
|
|
|
def testonetoone(self):
|
|
m = mapper(User, users, properties = dict(
|
|
address = relation(mapper(Address, addresses), lazy = True, uselist = False)
|
|
))
|
|
l = m.select(users.c.user_id == 7)
|
|
self.echo(repr(l))
|
|
self.echo(repr(l[0].address))
|
|
|
|
def testbackwardsonetoone(self):
|
|
m = mapper(Address, addresses, properties = dict(
|
|
user = relation(mapper(User, users, properties = {'id':users.c.user_id}), lazy = True)
|
|
))
|
|
l = m.select(addresses.c.address_id == 1)
|
|
self.echo(repr(l))
|
|
print repr(l[0].__dict__)
|
|
self.echo(repr(l[0].user))
|
|
self.assert_(l[0].user is not None)
|
|
|
|
|
|
def testdouble(self):
|
|
"""tests lazy loading with two relations simulatneously, from the same table, using aliases. """
|
|
openorders = alias(orders, 'openorders')
|
|
closedorders = alias(orders, 'closedorders')
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), lazy = False),
|
|
open_orders = relation(mapper(Order, openorders), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = True),
|
|
closed_orders = relation(mapper(Order, closedorders), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = True)
|
|
))
|
|
l = m.select()
|
|
self.assert_result(l, User,
|
|
{'user_id' : 7,
|
|
'addresses' : (Address, [{'address_id' : 1}]),
|
|
'open_orders' : (Order, [{'order_id' : 3}]),
|
|
'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},])
|
|
},
|
|
{'user_id' : 8,
|
|
'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
|
|
'open_orders' : (Order, []),
|
|
'closed_orders' : (Order, [])
|
|
},
|
|
{'user_id' : 9,
|
|
'addresses' : (Address, []),
|
|
'open_orders' : (Order, [{'order_id' : 4}]),
|
|
'closed_orders' : (Order, [{'order_id' : 2}])
|
|
}
|
|
)
|
|
|
|
def testmanytomany(self):
|
|
"""tests a many-to-many lazy load"""
|
|
assign_mapper(Item, orderitems, properties = dict(
|
|
keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True),
|
|
))
|
|
l = Item.mapper.select()
|
|
self.assert_result(l, Item,
|
|
{'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
|
|
{'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
|
|
{'item_id' : 3, 'keywords' : (Keyword, [{'keyword_id' : 3}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
|
|
{'item_id' : 4, 'keywords' : (Keyword, [])},
|
|
{'item_id' : 5, 'keywords' : (Keyword, [])}
|
|
)
|
|
l = Item.mapper.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, Item.c.item_id==itemkeywords.c.item_id))
|
|
self.assert_result(l, Item,
|
|
{'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
|
|
{'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
|
|
)
|
|
|
|
class EagerTest(MapperSuperTest):
|
|
def testbasic(self):
|
|
"""tests a basic one-to-many eager load"""
|
|
|
|
m = mapper(Address, addresses)
|
|
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(m, lazy = False),
|
|
))
|
|
l = m.select()
|
|
self.assert_result(l, User, *user_address_result)
|
|
|
|
def testorderby(self):
|
|
m = mapper(Address, addresses)
|
|
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(m, lazy = False, order_by=addresses.c.email_address),
|
|
))
|
|
l = m.select()
|
|
self.assert_result(l, User,
|
|
{'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
|
|
{'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])},
|
|
{'user_id' : 9, 'addresses' : (Address, [])}
|
|
)
|
|
|
|
def testorderby_desc(self):
|
|
m = mapper(Address, addresses)
|
|
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(m, lazy = False, order_by=[desc(addresses.c.email_address)]),
|
|
))
|
|
l = m.select()
|
|
|
|
self.assert_result(l, User,
|
|
{'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
|
|
{'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'},{'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}, ])},
|
|
{'user_id' : 9, 'addresses' : (Address, [])},
|
|
)
|
|
|
|
def testlimit(self):
|
|
ordermapper = mapper(Order, orders, properties = dict(
|
|
items = relation(mapper(Item, orderitems), lazy = False)
|
|
))
|
|
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), lazy = False),
|
|
orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
|
|
))
|
|
l = m.select(limit=2, offset=1)
|
|
self.assert_result(l, User, *user_all_result[1:3])
|
|
# this is an involved 3x union of the users table to get a lot of rows.
|
|
# then see if the "distinct" works its way out. you actually get the same
|
|
# result with or without the distinct, just via less or more rows.
|
|
u2 = users.alias('u2')
|
|
s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
|
|
l = m.select(s.c.u2_user_id==User.c.user_id, distinct=True)
|
|
self.assert_result(l, User, *user_all_result)
|
|
objectstore.clear()
|
|
m = mapper(Item, orderitems, is_primary=True, properties = dict(
|
|
keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]),
|
|
))
|
|
l = m.select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2)
|
|
self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]])
|
|
|
|
|
|
|
|
def testonetoone(self):
|
|
m = mapper(User, users, properties = dict(
|
|
address = relation(mapper(Address, addresses), lazy = False, uselist = False)
|
|
))
|
|
l = m.select(users.c.user_id == 7)
|
|
self.assert_result(l, User,
|
|
{'user_id' : 7, 'address' : (Address, {'address_id' : 1, 'email_address': 'jack@bean.com'})},
|
|
)
|
|
|
|
def testbackwardsonetoone(self):
|
|
m = mapper(Address, addresses, properties = dict(
|
|
user = relation(mapper(User, users), lazy = False)
|
|
))
|
|
self.echo(repr(m.props['user'].uselist))
|
|
l = m.select(addresses.c.address_id == 1)
|
|
self.assert_result(l, Address,
|
|
{'address_id' : 1, 'email_address' : 'jack@bean.com',
|
|
'user' : (User, {'user_id' : 7, 'user_name' : 'jack'})
|
|
},
|
|
)
|
|
|
|
def testwithrepeat(self):
|
|
"""tests a one-to-many eager load where we also query on joined criterion, where the joined
|
|
criterion is using the same tables that are used within the eager load. the mapper must insure that the
|
|
criterion doesnt interfere with the eager load criterion."""
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False)
|
|
))
|
|
l = m.select(and_(addresses.c.email_address == 'ed@lala.com', addresses.c.user_id==users.c.user_id))
|
|
self.assert_result(l, User,
|
|
{'user_id' : 8, 'addresses' : (Address, [{'address_id' : 2, 'email_address':'ed@wood.com'}, {'address_id':3, 'email_address':'ed@bettyboop.com'}, {'address_id':4, 'email_address':'ed@lala.com'}])},
|
|
)
|
|
|
|
|
|
def testcompile(self):
|
|
"""tests deferred operation of a pre-compiled mapper statement"""
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), lazy = False)
|
|
))
|
|
s = m.compile(and_(addresses.c.email_address == bindparam('emailad'), addresses.c.user_id==users.c.user_id))
|
|
c = s.compile()
|
|
self.echo("\n" + str(c) + repr(c.get_params()))
|
|
|
|
l = m.instances(s.execute(emailad = 'jack@bean.com'))
|
|
self.echo(repr(l))
|
|
|
|
def testmulti(self):
|
|
"""tests eager loading with two relations simultaneously"""
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False),
|
|
orders = relation(mapper(Order, orders), lazy = False),
|
|
))
|
|
l = m.select()
|
|
self.assert_result(l, User,
|
|
{'user_id' : 7,
|
|
'addresses' : (Address, [{'address_id' : 1}]),
|
|
'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},])
|
|
},
|
|
{'user_id' : 8,
|
|
'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
|
|
'orders' : (Order, [])
|
|
},
|
|
{'user_id' : 9,
|
|
'addresses' : (Address, []),
|
|
'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}])
|
|
}
|
|
)
|
|
|
|
def testdouble(self):
|
|
"""tests eager loading with two relations simulatneously, from the same table. """
|
|
openorders = alias(orders, 'openorders')
|
|
closedorders = alias(orders, 'closedorders')
|
|
ordermapper = mapper(Order, orders)
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), lazy = False),
|
|
open_orders = relation(mapper(Order, openorders), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = False),
|
|
closed_orders = relation(mapper(Order, closedorders), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = False)
|
|
))
|
|
l = m.select()
|
|
self.assert_result(l, User,
|
|
{'user_id' : 7,
|
|
'addresses' : (Address, [{'address_id' : 1}]),
|
|
'open_orders' : (Order, [{'order_id' : 3}]),
|
|
'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},])
|
|
},
|
|
{'user_id' : 8,
|
|
'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
|
|
'open_orders' : (Order, []),
|
|
'closed_orders' : (Order, [])
|
|
},
|
|
{'user_id' : 9,
|
|
'addresses' : (Address, []),
|
|
'open_orders' : (Order, [{'order_id' : 4}]),
|
|
'closed_orders' : (Order, [{'order_id' : 2}])
|
|
}
|
|
)
|
|
|
|
def testnested(self):
|
|
"""tests eager loading of a parent item with two types of child items,
|
|
where one of those child items eager loads its own child items."""
|
|
ordermapper = mapper(Order, orders, properties = dict(
|
|
items = relation(mapper(Item, orderitems), lazy = False)
|
|
))
|
|
|
|
m = mapper(User, users, properties = dict(
|
|
addresses = relation(mapper(Address, addresses), lazy = False),
|
|
orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
|
|
))
|
|
l = m.select()
|
|
self.assert_result(l, User, *user_all_result)
|
|
|
|
def testmanytomany(self):
|
|
items = orderitems
|
|
|
|
m = mapper(Item, items, properties = dict(
|
|
keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=False, order_by=[keywords.c.keyword_id]),
|
|
))
|
|
l = m.select()
|
|
self.assert_result(l, Item, *item_keyword_result)
|
|
|
|
# l = m.select()
|
|
l = m.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id))
|
|
self.assert_result(l, Item,
|
|
{'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
|
|
{'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
|
|
)
|
|
|
|
def testoneandmany(self):
|
|
"""tests eager load for a parent object with a child object that
|
|
contains a many-to-many relationship to a third object."""
|
|
items = orderitems
|
|
|
|
m = mapper(Item, items,
|
|
properties = dict(
|
|
keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]),
|
|
))
|
|
|
|
m = mapper(Order, orders, properties = dict(
|
|
items = relation(m, lazy = False)
|
|
))
|
|
l = m.select("orders.order_id in (1,2,3)")
|
|
self.assert_result(l, Order,
|
|
{'order_id' : 1, 'items': (Item, [])},
|
|
{'order_id' : 2, 'items': (Item, [
|
|
{'item_id':1, 'item_name':'item 1', 'keywords': (Keyword, [{'keyword_id':2, 'name':'red'}, {'keyword_id':4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])},
|
|
{'item_id':2, 'item_name':'item 2','keywords' : (Keyword, [{'keyword_id' : 2, 'name':'red'}, {'keyword_id' : 5, 'name':'small'}, {'keyword_id' : 7, 'name':'square'}])}
|
|
])},
|
|
{'order_id' : 3, 'items': (Item, [
|
|
{'item_id':3, 'item_name':'item 3', 'keywords' : (Keyword, [{'keyword_id' : 3, 'name':'green'}, {'keyword_id' : 4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])},
|
|
{'item_id':4, 'item_name':'item 4'},
|
|
{'item_id':5, 'item_name':'item 5'}
|
|
])},
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
testbase.main()
|