mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-11 19:29:52 -04:00
5ea1d67315
- Removed an obscure feature of execute() (including connection,
engine, Session) whereby a bindparam() construct can be sent as
a key to the params dictionary. This usage is undocumented
and is at the core of an issue whereby the bindparam() object
created implicitly by a text() construct may have the same
hash value as a string placed in the params dictionary and
may result in an inappropriate match when computing the final
bind parameters. Internal checks for this condition would
add significant latency to the critical task of parameter
rendering, so the behavior is removed. This is a backwards
incompatible change for any application that may have been
using this feature, however the feature has never been
documented.
1322 lines
53 KiB
Python
1322 lines
53 KiB
Python
import testenv; testenv.configure_for_tests()
|
|
import datetime
|
|
from sqlalchemy import *
|
|
from sqlalchemy import exc, sql
|
|
from sqlalchemy.engine import default
|
|
from testlib import *
|
|
from testlib.testing import eq_
|
|
|
|
class QueryTest(TestBase):
|
|
|
|
def setUpAll(self):
|
|
global users, users2, addresses, metadata
|
|
metadata = MetaData(testing.db)
|
|
users = Table('query_users', metadata,
|
|
Column('user_id', INT, primary_key = True),
|
|
Column('user_name', VARCHAR(20)),
|
|
)
|
|
addresses = Table('query_addresses', metadata,
|
|
Column('address_id', Integer, primary_key=True),
|
|
Column('user_id', Integer, ForeignKey('query_users.user_id')),
|
|
Column('address', String(30)))
|
|
|
|
users2 = Table('u2', metadata,
|
|
Column('user_id', INT, primary_key = True),
|
|
Column('user_name', VARCHAR(20)),
|
|
)
|
|
metadata.create_all()
|
|
|
|
def tearDown(self):
|
|
addresses.delete().execute()
|
|
users.delete().execute()
|
|
users2.delete().execute()
|
|
|
|
def tearDownAll(self):
|
|
metadata.drop_all()
|
|
|
|
def test_insert(self):
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
assert users.count().scalar() == 1
|
|
|
|
def test_insert_heterogeneous_params(self):
|
|
users.insert().execute(
|
|
{'user_id':7, 'user_name':'jack'},
|
|
{'user_id':8, 'user_name':'ed'},
|
|
{'user_id':9}
|
|
)
|
|
assert users.select().execute().fetchall() == [(7, 'jack'), (8, 'ed'), (9, None)]
|
|
|
|
def test_update(self):
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
assert users.count().scalar() == 1
|
|
|
|
users.update(users.c.user_id == 7).execute(user_name = 'fred')
|
|
assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred'
|
|
|
|
def test_lastrow_accessor(self):
|
|
"""Tests the last_inserted_ids() and lastrow_has_id() functions."""
|
|
|
|
def insert_values(table, values):
|
|
"""
|
|
Inserts a row into a table, returns the full list of values
|
|
INSERTed including defaults that fired off on the DB side and
|
|
detects rows that had defaults and post-fetches.
|
|
"""
|
|
|
|
result = table.insert().execute(**values)
|
|
ret = values.copy()
|
|
|
|
for col, id in zip(table.primary_key, result.last_inserted_ids()):
|
|
ret[col.key] = id
|
|
|
|
if result.lastrow_has_defaults():
|
|
criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())])
|
|
row = table.select(criterion).execute().fetchone()
|
|
for c in table.c:
|
|
ret[c.key] = row[c]
|
|
return ret
|
|
|
|
for supported, table, values, assertvalues in [
|
|
(
|
|
{'unsupported':['sqlite']},
|
|
Table("t1", metadata,
|
|
Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True),
|
|
Column('foo', String(30), primary_key=True)),
|
|
{'foo':'hi'},
|
|
{'id':1, 'foo':'hi'}
|
|
),
|
|
(
|
|
{'unsupported':['sqlite']},
|
|
Table("t2", metadata,
|
|
Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True),
|
|
Column('foo', String(30), primary_key=True),
|
|
Column('bar', String(30), server_default='hi')
|
|
),
|
|
{'foo':'hi'},
|
|
{'id':1, 'foo':'hi', 'bar':'hi'}
|
|
),
|
|
(
|
|
{'unsupported':[]},
|
|
Table("t3", metadata,
|
|
Column("id", String(40), primary_key=True),
|
|
Column('foo', String(30), primary_key=True),
|
|
Column("bar", String(30))
|
|
),
|
|
{'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"},
|
|
{'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}
|
|
),
|
|
(
|
|
{'unsupported':[]},
|
|
Table("t4", metadata,
|
|
Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),
|
|
Column('foo', String(30), primary_key=True),
|
|
Column('bar', String(30), server_default='hi')
|
|
),
|
|
{'foo':'hi', 'id':1},
|
|
{'id':1, 'foo':'hi', 'bar':'hi'}
|
|
),
|
|
(
|
|
{'unsupported':[]},
|
|
Table("t5", metadata,
|
|
Column('id', String(10), primary_key=True),
|
|
Column('bar', String(30), server_default='hi')
|
|
),
|
|
{'id':'id1'},
|
|
{'id':'id1', 'bar':'hi'},
|
|
),
|
|
]:
|
|
if testing.db.name in supported['unsupported']:
|
|
continue
|
|
try:
|
|
table.create()
|
|
i = insert_values(table, values)
|
|
assert i == assertvalues, repr(i) + " " + repr(assertvalues)
|
|
finally:
|
|
table.drop()
|
|
|
|
def test_row_iteration(self):
|
|
users.insert().execute(
|
|
{'user_id':7, 'user_name':'jack'},
|
|
{'user_id':8, 'user_name':'ed'},
|
|
{'user_id':9, 'user_name':'fred'},
|
|
)
|
|
r = users.select().execute()
|
|
l = []
|
|
for row in r:
|
|
l.append(row)
|
|
self.assert_(len(l) == 3)
|
|
|
|
@testing.fails_on('firebird', 'Data type unknown')
|
|
@testing.requires.subqueries
|
|
def test_anonymous_rows(self):
|
|
users.insert().execute(
|
|
{'user_id':7, 'user_name':'jack'},
|
|
{'user_id':8, 'user_name':'ed'},
|
|
{'user_id':9, 'user_name':'fred'},
|
|
)
|
|
|
|
sel = select([users.c.user_id]).where(users.c.user_name=='jack').as_scalar()
|
|
for row in select([sel + 1, sel + 3], bind=users.bind).execute():
|
|
assert row['anon_1'] == 8
|
|
assert row['anon_2'] == 10
|
|
|
|
def test_order_by_label(self):
|
|
"""test that a label within an ORDER BY works on each backend.
|
|
|
|
simple labels in ORDER BYs now render as the actual labelname
|
|
which not every database supports.
|
|
|
|
"""
|
|
users.insert().execute(
|
|
{'user_id':7, 'user_name':'jack'},
|
|
{'user_id':8, 'user_name':'ed'},
|
|
{'user_id':9, 'user_name':'fred'},
|
|
)
|
|
|
|
concat = ("test: " + users.c.user_name).label('thedata')
|
|
self.assertEquals(
|
|
select([concat]).order_by(concat).execute().fetchall(),
|
|
[("test: ed",), ("test: fred",), ("test: jack",)]
|
|
)
|
|
|
|
concat = ("test: " + users.c.user_name).label('thedata')
|
|
self.assertEquals(
|
|
select([concat]).order_by(desc(concat)).execute().fetchall(),
|
|
[("test: jack",), ("test: fred",), ("test: ed",)]
|
|
)
|
|
|
|
concat = ("test: " + users.c.user_name).label('thedata')
|
|
self.assertEquals(
|
|
select([concat]).order_by(concat + "x").execute().fetchall(),
|
|
[("test: ed",), ("test: fred",), ("test: jack",)]
|
|
)
|
|
|
|
|
|
def test_row_comparison(self):
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
rp = users.select().execute().fetchone()
|
|
|
|
self.assert_(rp == rp)
|
|
self.assert_(not(rp != rp))
|
|
|
|
equal = (7, 'jack')
|
|
|
|
self.assert_(rp == equal)
|
|
self.assert_(equal == rp)
|
|
self.assert_(not (rp != equal))
|
|
self.assert_(not (equal != equal))
|
|
|
|
@testing.fails_on('mssql', 'No support for boolean logic in column select.')
|
|
@testing.fails_on('oracle', 'FIXME: unknown')
|
|
def test_or_and_as_columns(self):
|
|
true, false = literal(True), literal(False)
|
|
|
|
self.assertEquals(testing.db.execute(select([and_(true, false)])).scalar(), False)
|
|
self.assertEquals(testing.db.execute(select([and_(true, true)])).scalar(), True)
|
|
self.assertEquals(testing.db.execute(select([or_(true, false)])).scalar(), True)
|
|
self.assertEquals(testing.db.execute(select([or_(false, false)])).scalar(), False)
|
|
self.assertEquals(testing.db.execute(select([not_(or_(false, false))])).scalar(), True)
|
|
|
|
row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).fetchone()
|
|
assert row.x == False
|
|
assert row.y == False
|
|
|
|
row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).fetchone()
|
|
assert row.x == True
|
|
assert row.y == False
|
|
|
|
def test_fetchmany(self):
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
users.insert().execute(user_id = 8, user_name = 'ed')
|
|
users.insert().execute(user_id = 9, user_name = 'fred')
|
|
r = users.select().execute()
|
|
l = []
|
|
for row in r.fetchmany(size=2):
|
|
l.append(row)
|
|
self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l))
|
|
|
|
def test_like_ops(self):
|
|
users.insert().execute(
|
|
{'user_id':1, 'user_name':'apples'},
|
|
{'user_id':2, 'user_name':'oranges'},
|
|
{'user_id':3, 'user_name':'bananas'},
|
|
{'user_id':4, 'user_name':'legumes'},
|
|
{'user_id':5, 'user_name':'hi % there'},
|
|
)
|
|
|
|
for expr, result in (
|
|
(select([users.c.user_id]).where(users.c.user_name.startswith('apple')), [(1,)]),
|
|
(select([users.c.user_id]).where(users.c.user_name.contains('i % t')), [(5,)]),
|
|
(select([users.c.user_id]).where(users.c.user_name.endswith('anas')), [(3,)]),
|
|
):
|
|
eq_(expr.execute().fetchall(), result)
|
|
|
|
|
|
@testing.emits_warning('.*now automatically escapes.*')
|
|
def test_percents_in_text(self):
|
|
for expr, result in (
|
|
(text("select 6 % 10"), 6),
|
|
(text("select 17 % 10"), 7),
|
|
(text("select '%'"), '%'),
|
|
(text("select '%%'"), '%%'),
|
|
(text("select '%%%'"), '%%%'),
|
|
(text("select 'hello % world'"), "hello % world")
|
|
):
|
|
eq_(testing.db.scalar(expr), result)
|
|
|
|
def test_ilike(self):
|
|
users.insert().execute(
|
|
{'user_id':1, 'user_name':'one'},
|
|
{'user_id':2, 'user_name':'TwO'},
|
|
{'user_id':3, 'user_name':'ONE'},
|
|
{'user_id':4, 'user_name':'OnE'},
|
|
)
|
|
|
|
self.assertEquals(select([users.c.user_id]).where(users.c.user_name.ilike('one')).execute().fetchall(), [(1, ), (3, ), (4, )])
|
|
|
|
self.assertEquals(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )])
|
|
|
|
if testing.against('postgres'):
|
|
self.assertEquals(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )])
|
|
self.assertEquals(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), [])
|
|
|
|
|
|
def test_compiled_execute(self):
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
s = select([users], users.c.user_id==bindparam('id')).compile()
|
|
c = testing.db.connect()
|
|
assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
|
|
|
|
def test_compiled_insert_execute(self):
|
|
users.insert().compile().execute(user_id = 7, user_name = 'jack')
|
|
s = select([users], users.c.user_id==bindparam('id')).compile()
|
|
c = testing.db.connect()
|
|
assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
|
|
|
|
def test_repeated_bindparams(self):
|
|
"""Tests that a BindParam can be used more than once.
|
|
|
|
This should be run for DB-APIs with both positional and named
|
|
paramstyles.
|
|
"""
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
users.insert().execute(user_id = 8, user_name = 'fred')
|
|
|
|
u = bindparam('userid')
|
|
s = users.select(and_(users.c.user_name==u, users.c.user_name==u))
|
|
r = s.execute(userid='fred').fetchall()
|
|
assert len(r) == 1
|
|
|
|
def test_bindparam_shortname(self):
|
|
"""test the 'shortname' field on BindParamClause."""
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
users.insert().execute(user_id = 8, user_name = 'fred')
|
|
u = bindparam('userid', shortname='someshortname')
|
|
s = users.select(users.c.user_name==u)
|
|
r = s.execute(someshortname='fred').fetchall()
|
|
assert len(r) == 1
|
|
|
|
def test_bindparam_detection(self):
|
|
dialect = default.DefaultDialect(paramstyle='qmark')
|
|
prep = lambda q: str(sql.text(q).compile(dialect=dialect))
|
|
|
|
def a_eq(got, wanted):
|
|
if got != wanted:
|
|
print "Wanted %s" % wanted
|
|
print "Received %s" % got
|
|
self.assert_(got == wanted, got)
|
|
|
|
a_eq(prep('select foo'), 'select foo')
|
|
a_eq(prep("time='12:30:00'"), "time='12:30:00'")
|
|
a_eq(prep(u"time='12:30:00'"), u"time='12:30:00'")
|
|
a_eq(prep(":this:that"), ":this:that")
|
|
a_eq(prep(":this :that"), "? ?")
|
|
a_eq(prep("(:this),(:that :other)"), "(?),(? ?)")
|
|
a_eq(prep("(:this),(:that:other)"), "(?),(:that:other)")
|
|
a_eq(prep("(:this),(:that,:other)"), "(?),(?,?)")
|
|
a_eq(prep("(:that_:other)"), "(:that_:other)")
|
|
a_eq(prep("(:that_ :other)"), "(? ?)")
|
|
a_eq(prep("(:that_other)"), "(?)")
|
|
a_eq(prep("(:that$other)"), "(?)")
|
|
a_eq(prep("(:that$:other)"), "(:that$:other)")
|
|
a_eq(prep(".:that$ :other."), ".? ?.")
|
|
|
|
a_eq(prep(r'select \foo'), r'select \foo')
|
|
a_eq(prep(r"time='12\:30:00'"), r"time='12\:30:00'")
|
|
a_eq(prep(":this \:that"), "? :that")
|
|
a_eq(prep(r"(\:that$other)"), "(:that$other)")
|
|
a_eq(prep(r".\:that$ :other."), ".:that$ ?.")
|
|
|
|
def test_delete(self):
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
users.insert().execute(user_id = 8, user_name = 'fred')
|
|
print repr(users.select().execute().fetchall())
|
|
|
|
users.delete(users.c.user_name == 'fred').execute()
|
|
|
|
print repr(users.select().execute().fetchall())
|
|
|
|
|
|
|
|
@testing.exclude('mysql', '<', (5, 0, 37), 'database bug')
|
|
def test_scalar_select(self):
|
|
"""test that scalar subqueries with labels get their type propagated to the result set."""
|
|
# mysql and/or mysqldb has a bug here, type isn't propagated for scalar
|
|
# subquery.
|
|
datetable = Table('datetable', metadata,
|
|
Column('id', Integer, primary_key=True),
|
|
Column('today', DateTime))
|
|
datetable.create()
|
|
try:
|
|
datetable.insert().execute(id=1, today=datetime.datetime(2006, 5, 12, 12, 0, 0))
|
|
s = select([datetable.alias('x').c.today]).as_scalar()
|
|
s2 = select([datetable.c.id, s.label('somelabel')])
|
|
#print s2.c.somelabel.type
|
|
assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime)
|
|
finally:
|
|
datetable.drop()
|
|
|
|
def test_order_by(self):
|
|
"""Exercises ORDER BY clause generation.
|
|
|
|
Tests simple, compound, aliased and DESC clauses.
|
|
"""
|
|
|
|
users.insert().execute(user_id=1, user_name='c')
|
|
users.insert().execute(user_id=2, user_name='b')
|
|
users.insert().execute(user_id=3, user_name='a')
|
|
|
|
def a_eq(executable, wanted):
|
|
got = list(executable.execute())
|
|
self.assertEquals(got, wanted)
|
|
|
|
for labels in False, True:
|
|
a_eq(users.select(order_by=[users.c.user_id],
|
|
use_labels=labels),
|
|
[(1, 'c'), (2, 'b'), (3, 'a')])
|
|
|
|
a_eq(users.select(order_by=[users.c.user_name, users.c.user_id],
|
|
use_labels=labels),
|
|
[(3, 'a'), (2, 'b'), (1, 'c')])
|
|
|
|
a_eq(select([users.c.user_id.label('foo')],
|
|
use_labels=labels,
|
|
order_by=[users.c.user_id]),
|
|
[(1,), (2,), (3,)])
|
|
|
|
a_eq(select([users.c.user_id.label('foo'), users.c.user_name],
|
|
use_labels=labels,
|
|
order_by=[users.c.user_name, users.c.user_id]),
|
|
[(3, 'a'), (2, 'b'), (1, 'c')])
|
|
|
|
a_eq(users.select(distinct=True,
|
|
use_labels=labels,
|
|
order_by=[users.c.user_id]),
|
|
[(1, 'c'), (2, 'b'), (3, 'a')])
|
|
|
|
a_eq(select([users.c.user_id.label('foo')],
|
|
distinct=True,
|
|
use_labels=labels,
|
|
order_by=[users.c.user_id]),
|
|
[(1,), (2,), (3,)])
|
|
|
|
a_eq(select([users.c.user_id.label('a'),
|
|
users.c.user_id.label('b'),
|
|
users.c.user_name],
|
|
use_labels=labels,
|
|
order_by=[users.c.user_id]),
|
|
[(1, 1, 'c'), (2, 2, 'b'), (3, 3, 'a')])
|
|
|
|
a_eq(users.select(distinct=True,
|
|
use_labels=labels,
|
|
order_by=[desc(users.c.user_id)]),
|
|
[(3, 'a'), (2, 'b'), (1, 'c')])
|
|
|
|
a_eq(select([users.c.user_id.label('foo')],
|
|
distinct=True,
|
|
use_labels=labels,
|
|
order_by=[users.c.user_id.desc()]),
|
|
[(3,), (2,), (1,)])
|
|
|
|
def test_column_accessor(self):
|
|
users.insert().execute(user_id=1, user_name='john')
|
|
users.insert().execute(user_id=2, user_name='jack')
|
|
addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com')
|
|
|
|
r = users.select(users.c.user_id==2).execute().fetchone()
|
|
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
|
|
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
|
|
|
|
r = text("select * from query_users where user_id=2", bind=testing.db).execute().fetchone()
|
|
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
|
|
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
|
|
|
|
# test slices
|
|
r = text("select * from query_addresses", bind=testing.db).execute().fetchone()
|
|
self.assert_(r[0:1] == (1,))
|
|
self.assert_(r[1:] == (2, 'foo@bar.com'))
|
|
self.assert_(r[:-1] == (1, 2))
|
|
|
|
# test a little sqlite weirdness - with the UNION, cols come back as "query_users.user_id" in cursor.description
|
|
r = text("select query_users.user_id, query_users.user_name from query_users "
|
|
"UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().fetchone()
|
|
self.assert_(r['user_id']) == 1
|
|
self.assert_(r['user_name']) == "john"
|
|
|
|
# test using literal tablename.colname
|
|
r = text('select query_users.user_id AS "query_users.user_id", query_users.user_name AS "query_users.user_name" from query_users', bind=testing.db).execute().fetchone()
|
|
self.assert_(r['query_users.user_id']) == 1
|
|
self.assert_(r['query_users.user_name']) == "john"
|
|
|
|
def test_row_as_args(self):
|
|
users.insert().execute(user_id=1, user_name='john')
|
|
r = users.select(users.c.user_id==1).execute().fetchone()
|
|
users.delete().execute()
|
|
users.insert().execute(r)
|
|
assert users.select().execute().fetchall() == [(1, 'john')]
|
|
|
|
def test_result_as_args(self):
|
|
users.insert().execute([dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')])
|
|
r = users.select().execute()
|
|
users2.insert().execute(list(r))
|
|
assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
|
|
|
|
users2.delete().execute()
|
|
r = users.select().execute()
|
|
users2.insert().execute(*list(r))
|
|
assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
|
|
|
|
def test_ambiguous_column(self):
|
|
users.insert().execute(user_id=1, user_name='john')
|
|
r = users.outerjoin(addresses).select().execute().fetchone()
|
|
try:
|
|
print r['user_id']
|
|
assert False
|
|
except exc.InvalidRequestError, e:
|
|
assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \
|
|
str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement."
|
|
|
|
@testing.requires.subqueries
|
|
def test_column_label_targeting(self):
|
|
users.insert().execute(user_id=7, user_name='ed')
|
|
|
|
for s in (
|
|
users.select().alias('foo'),
|
|
users.select().alias(users.name),
|
|
):
|
|
row = s.select(use_labels=True).execute().fetchone()
|
|
assert row[s.c.user_id] == 7
|
|
assert row[s.c.user_name] == 'ed'
|
|
|
|
def test_keys(self):
|
|
users.insert().execute(user_id=1, user_name='foo')
|
|
r = users.select().execute().fetchone()
|
|
self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name'])
|
|
|
|
def test_items(self):
|
|
users.insert().execute(user_id=1, user_name='foo')
|
|
r = users.select().execute().fetchone()
|
|
self.assertEqual([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
|
|
|
|
def test_len(self):
|
|
users.insert().execute(user_id=1, user_name='foo')
|
|
r = users.select().execute().fetchone()
|
|
self.assertEqual(len(r), 2)
|
|
r.close()
|
|
r = testing.db.execute('select user_name, user_id from query_users').fetchone()
|
|
self.assertEqual(len(r), 2)
|
|
r.close()
|
|
r = testing.db.execute('select user_name from query_users').fetchone()
|
|
self.assertEqual(len(r), 1)
|
|
r.close()
|
|
|
|
def test_cant_execute_join(self):
|
|
try:
|
|
users.join(addresses).execute()
|
|
except exc.ArgumentError, e:
|
|
assert str(e).startswith('Not an executable clause: ')
|
|
|
|
|
|
|
|
def test_column_order_with_simple_query(self):
|
|
# should return values in column definition order
|
|
users.insert().execute(user_id=1, user_name='foo')
|
|
r = users.select(users.c.user_id==1).execute().fetchone()
|
|
self.assertEqual(r[0], 1)
|
|
self.assertEqual(r[1], 'foo')
|
|
self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name'])
|
|
self.assertEqual(r.values(), [1, 'foo'])
|
|
|
|
def test_column_order_with_text_query(self):
|
|
# should return values in query order
|
|
users.insert().execute(user_id=1, user_name='foo')
|
|
r = testing.db.execute('select user_name, user_id from query_users').fetchone()
|
|
self.assertEqual(r[0], 'foo')
|
|
self.assertEqual(r[1], 1)
|
|
self.assertEqual([x.lower() for x in r.keys()], ['user_name', 'user_id'])
|
|
self.assertEqual(r.values(), ['foo', 1])
|
|
|
|
@testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()')
|
|
@testing.crashes('firebird', 'An identifier must begin with a letter')
|
|
@testing.crashes('maxdb', 'FIXME: unknown, verify not fails_on()')
|
|
def test_column_accessor_shadow(self):
|
|
meta = MetaData(testing.db)
|
|
shadowed = Table('test_shadowed', meta,
|
|
Column('shadow_id', INT, primary_key = True),
|
|
Column('shadow_name', VARCHAR(20)),
|
|
Column('parent', VARCHAR(20)),
|
|
Column('row', VARCHAR(40)),
|
|
Column('__parent', VARCHAR(20)),
|
|
Column('__row', VARCHAR(20)),
|
|
)
|
|
shadowed.create(checkfirst=True)
|
|
try:
|
|
shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
|
|
r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
|
|
self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
|
|
self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
|
|
self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
|
|
self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
|
|
self.assert_(r['__parent'] == 'Hidden parent')
|
|
self.assert_(r['__row'] == 'Hidden row')
|
|
try:
|
|
print r.__parent, r.__row
|
|
self.fail('Should not allow access to private attributes')
|
|
except AttributeError:
|
|
pass # expected
|
|
r.close()
|
|
finally:
|
|
shadowed.drop(checkfirst=True)
|
|
|
|
def test_in_filtering(self):
|
|
"""test the behavior of the in_() function."""
|
|
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
users.insert().execute(user_id = 8, user_name = 'fred')
|
|
users.insert().execute(user_id = 9, user_name = None)
|
|
|
|
s = users.select(users.c.user_name.in_([]))
|
|
r = s.execute().fetchall()
|
|
# No username is in empty set
|
|
assert len(r) == 0
|
|
|
|
s = users.select(not_(users.c.user_name.in_([])))
|
|
r = s.execute().fetchall()
|
|
# All usernames with a value are outside an empty set
|
|
assert len(r) == 2
|
|
|
|
s = users.select(users.c.user_name.in_(['jack','fred']))
|
|
r = s.execute().fetchall()
|
|
assert len(r) == 2
|
|
|
|
s = users.select(not_(users.c.user_name.in_(['jack','fred'])))
|
|
r = s.execute().fetchall()
|
|
# Null values are not outside any set
|
|
assert len(r) == 0
|
|
|
|
u = bindparam('search_key')
|
|
|
|
s = users.select(u.in_([]))
|
|
r = s.execute(search_key='john').fetchall()
|
|
assert len(r) == 0
|
|
r = s.execute(search_key=None).fetchall()
|
|
assert len(r) == 0
|
|
|
|
s = users.select(not_(u.in_([])))
|
|
r = s.execute(search_key='john').fetchall()
|
|
assert len(r) == 3
|
|
r = s.execute(search_key=None).fetchall()
|
|
assert len(r) == 0
|
|
|
|
@testing.fails_on('firebird', 'FIXME: unknown')
|
|
@testing.fails_on('maxdb', 'FIXME: unknown')
|
|
@testing.fails_on('oracle', 'FIXME: unknown')
|
|
@testing.fails_on('mssql', 'FIXME: unknown')
|
|
def test_in_filtering_advanced(self):
|
|
"""test the behavior of the in_() function when comparing against an empty collection."""
|
|
|
|
users.insert().execute(user_id = 7, user_name = 'jack')
|
|
users.insert().execute(user_id = 8, user_name = 'fred')
|
|
users.insert().execute(user_id = 9, user_name = None)
|
|
|
|
s = users.select(users.c.user_name.in_([]) == True)
|
|
r = s.execute().fetchall()
|
|
assert len(r) == 0
|
|
s = users.select(users.c.user_name.in_([]) == False)
|
|
r = s.execute().fetchall()
|
|
assert len(r) == 2
|
|
s = users.select(users.c.user_name.in_([]) == None)
|
|
r = s.execute().fetchall()
|
|
assert len(r) == 1
|
|
|
|
class PercentSchemaNamesTest(TestBase):
|
|
"""tests using percent signs, spaces in table and column names.
|
|
|
|
Doesn't pass for mysql, postgres, but this is really a
|
|
SQLAlchemy bug - we should be escaping out %% signs for this
|
|
operation the same way we do for text() and column labels.
|
|
|
|
"""
|
|
@testing.crashes('mysql', 'mysqldb calls name % (params)')
|
|
@testing.crashes('postgres', 'postgres calls name % (params)')
|
|
def setUpAll(self):
|
|
global percent_table, metadata
|
|
metadata = MetaData(testing.db)
|
|
percent_table = Table('percent%table', metadata,
|
|
Column("percent%", Integer),
|
|
Column("%(oneofthese)s", Integer),
|
|
Column("spaces % more spaces", Integer),
|
|
)
|
|
metadata.create_all()
|
|
|
|
@testing.crashes('mysql', 'mysqldb calls name % (params)')
|
|
@testing.crashes('postgres', 'postgres calls name % (params)')
|
|
def tearDownAll(self):
|
|
metadata.drop_all()
|
|
|
|
@testing.crashes('mysql', 'mysqldb calls name % (params)')
|
|
@testing.crashes('postgres', 'postgres calls name % (params)')
|
|
def test_roundtrip(self):
|
|
percent_table.insert().execute(
|
|
{'percent%':5, '%(oneofthese)s':7, 'spaces % more spaces':12},
|
|
)
|
|
percent_table.insert().execute(
|
|
{'percent%':7, '%(oneofthese)s':8, 'spaces % more spaces':11},
|
|
{'percent%':9, '%(oneofthese)s':9, 'spaces % more spaces':10},
|
|
{'percent%':11, '%(oneofthese)s':10, 'spaces % more spaces':9},
|
|
)
|
|
|
|
for table in (percent_table, percent_table.alias()):
|
|
eq_(
|
|
table.select().order_by(table.c['%(oneofthese)s']).execute().fetchall(),
|
|
[
|
|
(5, 7, 12),
|
|
(7, 8, 11),
|
|
(9, 9, 10),
|
|
(11, 10, 9)
|
|
]
|
|
)
|
|
|
|
eq_(
|
|
table.select().
|
|
where(table.c['spaces % more spaces'].in_([9, 10])).
|
|
order_by(table.c['%(oneofthese)s']).execute().fetchall(),
|
|
[
|
|
(9, 9, 10),
|
|
(11, 10, 9)
|
|
]
|
|
)
|
|
|
|
result = table.select().order_by(table.c['%(oneofthese)s']).execute()
|
|
row = result.fetchone()
|
|
eq_(row[table.c['percent%']], 5)
|
|
eq_(row[table.c['%(oneofthese)s']], 7)
|
|
eq_(row[table.c['spaces % more spaces']], 12)
|
|
row = result.fetchone()
|
|
eq_(row['percent%'], 7)
|
|
eq_(row['%(oneofthese)s'], 8)
|
|
eq_(row['spaces % more spaces'], 11)
|
|
result.close()
|
|
|
|
percent_table.update().values({percent_table.c['%(oneofthese)s']:9, percent_table.c['spaces % more spaces']:15}).execute()
|
|
|
|
eq_(
|
|
percent_table.select().order_by(percent_table.c['%(oneofthese)s']).execute().fetchall(),
|
|
[
|
|
(5, 9, 15),
|
|
(7, 9, 15),
|
|
(9, 9, 15),
|
|
(11, 9, 15)
|
|
]
|
|
)
|
|
|
|
|
|
|
|
class LimitTest(TestBase):
|
|
|
|
def setUpAll(self):
|
|
global users, addresses, metadata
|
|
metadata = MetaData(testing.db)
|
|
users = Table('query_users', metadata,
|
|
Column('user_id', INT, primary_key = True),
|
|
Column('user_name', VARCHAR(20)),
|
|
)
|
|
addresses = Table('query_addresses', metadata,
|
|
Column('address_id', Integer, primary_key=True),
|
|
Column('user_id', Integer, ForeignKey('query_users.user_id')),
|
|
Column('address', String(30)))
|
|
metadata.create_all()
|
|
self._data()
|
|
|
|
def _data(self):
|
|
users.insert().execute(user_id=1, user_name='john')
|
|
addresses.insert().execute(address_id=1, user_id=1, address='addr1')
|
|
users.insert().execute(user_id=2, user_name='jack')
|
|
addresses.insert().execute(address_id=2, user_id=2, address='addr1')
|
|
users.insert().execute(user_id=3, user_name='ed')
|
|
addresses.insert().execute(address_id=3, user_id=3, address='addr2')
|
|
users.insert().execute(user_id=4, user_name='wendy')
|
|
addresses.insert().execute(address_id=4, user_id=4, address='addr3')
|
|
users.insert().execute(user_id=5, user_name='laura')
|
|
addresses.insert().execute(address_id=5, user_id=5, address='addr4')
|
|
users.insert().execute(user_id=6, user_name='ralph')
|
|
addresses.insert().execute(address_id=6, user_id=6, address='addr5')
|
|
users.insert().execute(user_id=7, user_name='fido')
|
|
addresses.insert().execute(address_id=7, user_id=7, address='addr5')
|
|
|
|
def tearDownAll(self):
|
|
metadata.drop_all()
|
|
|
|
def test_select_limit(self):
|
|
r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall()
|
|
self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
|
|
|
|
@testing.fails_on('maxdb', 'FIXME: unknown')
|
|
def test_select_limit_offset(self):
|
|
"""Test the interaction between limit and offset"""
|
|
|
|
r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
|
|
self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
|
|
r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall()
|
|
self.assert_(r==[(6, 'ralph'), (7, 'fido')])
|
|
|
|
def test_select_distinct_limit(self):
|
|
"""Test the interaction between limit and distinct"""
|
|
|
|
r = sorted([x[0] for x in select([addresses.c.address]).distinct().limit(3).order_by(addresses.c.address).execute().fetchall()])
|
|
self.assert_(len(r) == 3, repr(r))
|
|
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
|
|
|
|
@testing.fails_on('mssql', 'FIXME: unknown')
|
|
def test_select_distinct_offset(self):
|
|
"""Test the interaction between distinct and offset"""
|
|
|
|
r = sorted([x[0] for x in select([addresses.c.address]).distinct().offset(1).order_by(addresses.c.address).execute().fetchall()])
|
|
self.assert_(len(r) == 4, repr(r))
|
|
self.assert_(r[0] != r[1] and r[1] != r[2] and r[2] != [3], repr(r))
|
|
|
|
def test_select_distinct_limit_offset(self):
|
|
"""Test the interaction between limit and limit/offset"""
|
|
|
|
r = select([addresses.c.address]).order_by(addresses.c.address).distinct().offset(2).limit(3).execute().fetchall()
|
|
self.assert_(len(r) == 3, repr(r))
|
|
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
|
|
|
|
class CompoundTest(TestBase):
|
|
"""test compound statements like UNION, INTERSECT, particularly their ability to nest on
|
|
different databases."""
|
|
def setUpAll(self):
|
|
global metadata, t1, t2, t3
|
|
metadata = MetaData(testing.db)
|
|
t1 = Table('t1', metadata,
|
|
Column('col1', Integer, Sequence('t1pkseq'), primary_key=True),
|
|
Column('col2', String(30)),
|
|
Column('col3', String(40)),
|
|
Column('col4', String(30))
|
|
)
|
|
t2 = Table('t2', metadata,
|
|
Column('col1', Integer, Sequence('t2pkseq'), primary_key=True),
|
|
Column('col2', String(30)),
|
|
Column('col3', String(40)),
|
|
Column('col4', String(30)))
|
|
t3 = Table('t3', metadata,
|
|
Column('col1', Integer, Sequence('t3pkseq'), primary_key=True),
|
|
Column('col2', String(30)),
|
|
Column('col3', String(40)),
|
|
Column('col4', String(30)))
|
|
metadata.create_all()
|
|
|
|
t1.insert().execute([
|
|
dict(col2="t1col2r1", col3="aaa", col4="aaa"),
|
|
dict(col2="t1col2r2", col3="bbb", col4="bbb"),
|
|
dict(col2="t1col2r3", col3="ccc", col4="ccc"),
|
|
])
|
|
t2.insert().execute([
|
|
dict(col2="t2col2r1", col3="aaa", col4="bbb"),
|
|
dict(col2="t2col2r2", col3="bbb", col4="ccc"),
|
|
dict(col2="t2col2r3", col3="ccc", col4="aaa"),
|
|
])
|
|
t3.insert().execute([
|
|
dict(col2="t3col2r1", col3="aaa", col4="ccc"),
|
|
dict(col2="t3col2r2", col3="bbb", col4="aaa"),
|
|
dict(col2="t3col2r3", col3="ccc", col4="bbb"),
|
|
])
|
|
|
|
def tearDownAll(self):
|
|
metadata.drop_all()
|
|
|
|
def _fetchall_sorted(self, executed):
|
|
return sorted([tuple(row) for row in executed.fetchall()])
|
|
|
|
@testing.requires.subqueries
|
|
def test_union(self):
|
|
(s1, s2) = (
|
|
select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
|
|
t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
|
|
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
|
|
t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
|
|
)
|
|
u = union(s1, s2)
|
|
|
|
wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'),
|
|
('ccc', 'aaa')]
|
|
found1 = self._fetchall_sorted(u.execute())
|
|
self.assertEquals(found1, wanted)
|
|
|
|
found2 = self._fetchall_sorted(u.alias('bar').select().execute())
|
|
self.assertEquals(found2, wanted)
|
|
|
|
def test_union_ordered(self):
|
|
(s1, s2) = (
|
|
select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
|
|
t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
|
|
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
|
|
t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
|
|
)
|
|
u = union(s1, s2, order_by=['col3', 'col4'])
|
|
|
|
wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'),
|
|
('ccc', 'aaa')]
|
|
self.assertEquals(u.execute().fetchall(), wanted)
|
|
|
|
@testing.fails_on('maxdb', 'FIXME: unknown')
|
|
@testing.requires.subqueries
|
|
def test_union_ordered_alias(self):
|
|
(s1, s2) = (
|
|
select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
|
|
t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
|
|
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
|
|
t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
|
|
)
|
|
u = union(s1, s2, order_by=['col3', 'col4'])
|
|
|
|
wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'),
|
|
('ccc', 'aaa')]
|
|
self.assertEquals(u.alias('bar').select().execute().fetchall(), wanted)
|
|
|
|
@testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
|
|
@testing.fails_on('mysql', 'FIXME: unknown')
|
|
@testing.fails_on('sqlite', 'FIXME: unknown')
|
|
def test_union_all(self):
|
|
e = union_all(
|
|
select([t1.c.col3]),
|
|
union(
|
|
select([t1.c.col3]),
|
|
select([t1.c.col3]),
|
|
)
|
|
)
|
|
|
|
wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)]
|
|
found1 = self._fetchall_sorted(e.execute())
|
|
self.assertEquals(found1, wanted)
|
|
|
|
found2 = self._fetchall_sorted(e.alias('foo').select().execute())
|
|
self.assertEquals(found2, wanted)
|
|
|
|
@testing.crashes('firebird', 'Does not support intersect')
|
|
@testing.crashes('sybase', 'FIXME: unknown, verify not fails_on')
|
|
@testing.fails_on('mysql', 'FIXME: unknown')
|
|
def test_intersect(self):
|
|
i = intersect(
|
|
select([t2.c.col3, t2.c.col4]),
|
|
select([t2.c.col3, t2.c.col4], t2.c.col4==t3.c.col3)
|
|
)
|
|
|
|
wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
|
|
|
|
found1 = self._fetchall_sorted(i.execute())
|
|
self.assertEquals(found1, wanted)
|
|
|
|
found2 = self._fetchall_sorted(i.alias('bar').select().execute())
|
|
self.assertEquals(found2, wanted)
|
|
|
|
@testing.crashes('firebird', 'Does not support except')
|
|
@testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
|
|
@testing.crashes('sybase', 'FIXME: unknown, verify not fails_on')
|
|
@testing.fails_on('mysql', 'FIXME: unknown')
|
|
def test_except_style1(self):
|
|
e = except_(union(
|
|
select([t1.c.col3, t1.c.col4]),
|
|
select([t2.c.col3, t2.c.col4]),
|
|
select([t3.c.col3, t3.c.col4]),
|
|
), select([t2.c.col3, t2.c.col4]))
|
|
|
|
wanted = [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'),
|
|
('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')]
|
|
|
|
found = self._fetchall_sorted(e.alias('bar').select().execute())
|
|
self.assertEquals(found, wanted)
|
|
|
|
@testing.crashes('firebird', 'Does not support except')
|
|
@testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
|
|
@testing.crashes('sybase', 'FIXME: unknown, verify not fails_on')
|
|
@testing.fails_on('mysql', 'FIXME: unknown')
|
|
def test_except_style2(self):
|
|
e = except_(union(
|
|
select([t1.c.col3, t1.c.col4]),
|
|
select([t2.c.col3, t2.c.col4]),
|
|
select([t3.c.col3, t3.c.col4]),
|
|
).alias('foo').select(), select([t2.c.col3, t2.c.col4]))
|
|
|
|
wanted = [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'),
|
|
('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')]
|
|
|
|
found1 = self._fetchall_sorted(e.execute())
|
|
self.assertEquals(found1, wanted)
|
|
|
|
found2 = self._fetchall_sorted(e.alias('bar').select().execute())
|
|
self.assertEquals(found2, wanted)
|
|
|
|
@testing.crashes('firebird', 'Does not support except')
|
|
@testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
|
|
@testing.crashes('sybase', 'FIXME: unknown, verify not fails_on')
|
|
@testing.fails_on('mysql', 'FIXME: unknown')
|
|
@testing.fails_on('sqlite', 'FIXME: unknown')
|
|
def test_except_style3(self):
|
|
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
|
|
e = except_(
|
|
select([t1.c.col3]), # aaa, bbb, ccc
|
|
except_(
|
|
select([t2.c.col3]), # aaa, bbb, ccc
|
|
select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc
|
|
)
|
|
)
|
|
self.assertEquals(e.execute().fetchall(), [('ccc',)])
|
|
self.assertEquals(e.alias('foo').select().execute().fetchall(),
|
|
[('ccc',)])
|
|
|
|
@testing.crashes('firebird', 'Does not support intersect')
|
|
@testing.fails_on('mysql', 'FIXME: unknown')
|
|
def test_composite(self):
|
|
u = intersect(
|
|
select([t2.c.col3, t2.c.col4]),
|
|
union(
|
|
select([t1.c.col3, t1.c.col4]),
|
|
select([t2.c.col3, t2.c.col4]),
|
|
select([t3.c.col3, t3.c.col4]),
|
|
).alias('foo').select()
|
|
)
|
|
wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
|
|
found = self._fetchall_sorted(u.execute())
|
|
|
|
self.assertEquals(found, wanted)
|
|
|
|
@testing.crashes('firebird', 'Does not support intersect')
|
|
@testing.fails_on('mysql', 'FIXME: unknown')
|
|
def test_composite_alias(self):
|
|
ua = intersect(
|
|
select([t2.c.col3, t2.c.col4]),
|
|
union(
|
|
select([t1.c.col3, t1.c.col4]),
|
|
select([t2.c.col3, t2.c.col4]),
|
|
select([t3.c.col3, t3.c.col4]),
|
|
).alias('foo').select()
|
|
).alias('bar')
|
|
|
|
wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
|
|
found = self._fetchall_sorted(ua.select().execute())
|
|
self.assertEquals(found, wanted)
|
|
|
|
|
|
class JoinTest(TestBase):
|
|
"""Tests join execution.
|
|
|
|
The compiled SQL emitted by the dialect might be ANSI joins or
|
|
theta joins ('old oracle style', with (+) for OUTER). This test
|
|
tries to exercise join syntax and uncover any inconsistencies in
|
|
`JOIN rhs ON lhs.col=rhs.col` vs `rhs.col=lhs.col`. At least one
|
|
database seems to be sensitive to this.
|
|
"""
|
|
|
|
def setUpAll(self):
|
|
global metadata
|
|
global t1, t2, t3
|
|
|
|
metadata = MetaData(testing.db)
|
|
t1 = Table('t1', metadata,
|
|
Column('t1_id', Integer, primary_key=True),
|
|
Column('name', String(32)))
|
|
t2 = Table('t2', metadata,
|
|
Column('t2_id', Integer, primary_key=True),
|
|
Column('t1_id', Integer, ForeignKey('t1.t1_id')),
|
|
Column('name', String(32)))
|
|
t3 = Table('t3', metadata,
|
|
Column('t3_id', Integer, primary_key=True),
|
|
Column('t2_id', Integer, ForeignKey('t2.t2_id')),
|
|
Column('name', String(32)))
|
|
metadata.drop_all()
|
|
metadata.create_all()
|
|
|
|
# t1.10 -> t2.20 -> t3.30
|
|
# t1.11 -> t2.21
|
|
# t1.12
|
|
t1.insert().execute({'t1_id': 10, 'name': 't1 #10'},
|
|
{'t1_id': 11, 'name': 't1 #11'},
|
|
{'t1_id': 12, 'name': 't1 #12'})
|
|
t2.insert().execute({'t2_id': 20, 't1_id': 10, 'name': 't2 #20'},
|
|
{'t2_id': 21, 't1_id': 11, 'name': 't2 #21'})
|
|
t3.insert().execute({'t3_id': 30, 't2_id': 20, 'name': 't3 #30'})
|
|
|
|
def tearDownAll(self):
|
|
metadata.drop_all()
|
|
|
|
def assertRows(self, statement, expected):
|
|
"""Execute a statement and assert that rows returned equal expected."""
|
|
|
|
found = sorted([tuple(row)
|
|
for row in statement.execute().fetchall()])
|
|
|
|
self.assertEquals(found, sorted(expected))
|
|
|
|
def test_join_x1(self):
|
|
"""Joins t1->t2."""
|
|
|
|
for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id],
|
|
from_obj=[t1.join(t2, criteria)])
|
|
self.assertRows(expr, [(10, 20), (11, 21)])
|
|
|
|
def test_join_x2(self):
|
|
"""Joins t1->t2->t3."""
|
|
|
|
for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id],
|
|
from_obj=[t1.join(t2, criteria)])
|
|
self.assertRows(expr, [(10, 20), (11, 21)])
|
|
|
|
def test_outerjoin_x1(self):
|
|
"""Outer joins t1->t2."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id],
|
|
from_obj=[t1.join(t2).join(t3, criteria)])
|
|
self.assertRows(expr, [(10, 20)])
|
|
|
|
def test_outerjoin_x2(self):
|
|
"""Outer joins t1->t2,t3."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
from_obj=[t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). \
|
|
outerjoin(t3, criteria)])
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None), (12, None, None)])
|
|
|
|
def test_outerjoin_where_x2_t1(self):
|
|
"""Outer joins t1->t2,t3, where on t1."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
t1.c.name == 't1 #10',
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
t1.c.t1_id < 12,
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
|
|
|
|
def test_outerjoin_where_x2_t2(self):
|
|
"""Outer joins t1->t2,t3, where on t2."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
t2.c.name == 't2 #20',
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
t2.c.t2_id < 29,
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
|
|
|
|
def test_outerjoin_where_x2_t1t2(self):
|
|
"""Outer joins t1->t2,t3, where on t1 and t2."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'),
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.t1_id < 19, 29 > t2.c.t2_id),
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
|
|
|
|
def test_outerjoin_where_x2_t3(self):
|
|
"""Outer joins t1->t2,t3, where on t3."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
t3.c.name == 't3 #30',
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
t3.c.t3_id < 39,
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
def test_outerjoin_where_x2_t1t3(self):
|
|
"""Outer joins t1->t2,t3, where on t1 and t3."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.name == 't1 #10', t3.c.name == 't3 #30'),
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.t1_id < 19, t3.c.t3_id < 39),
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
def test_outerjoin_where_x2_t1t2(self):
|
|
"""Outer joins t1->t2,t3, where on t1 and t2."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'),
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.t1_id < 12, t2.c.t2_id < 39),
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
|
|
|
|
def test_outerjoin_where_x2_t1t2t3(self):
|
|
"""Outer joins t1->t2,t3, where on t1, t2 and t3."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.name == 't1 #10',
|
|
t2.c.name == 't2 #20',
|
|
t3.c.name == 't3 #30'),
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.t1_id < 19,
|
|
t2.c.t2_id < 29,
|
|
t3.c.t3_id < 39),
|
|
from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
|
|
outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
def test_mixed(self):
|
|
"""Joins t1->t2, outer t2->t3."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
|
|
print expr
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
|
|
|
|
def test_mixed_where(self):
|
|
"""Joins t1->t2, outer t2->t3, plus a where on each table in turn."""
|
|
|
|
for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
t1.c.name == 't1 #10',
|
|
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
t2.c.name == 't2 #20',
|
|
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
t3.c.name == 't3 #30',
|
|
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'),
|
|
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t2.c.name == 't2 #20', t3.c.name == 't3 #30'),
|
|
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = select(
|
|
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
|
|
and_(t1.c.name == 't1 #10',
|
|
t2.c.name == 't2 #20',
|
|
t3.c.name == 't3 #30'),
|
|
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
|
|
class OperatorTest(TestBase):
|
|
def setUpAll(self):
|
|
global metadata, flds
|
|
metadata = MetaData(testing.db)
|
|
flds = Table('flds', metadata,
|
|
Column('idcol', Integer, Sequence('t1pkseq'), primary_key=True),
|
|
Column('intcol', Integer),
|
|
Column('strcol', String(50)),
|
|
)
|
|
metadata.create_all()
|
|
|
|
flds.insert().execute([
|
|
dict(intcol=5, strcol='foo'),
|
|
dict(intcol=13, strcol='bar')
|
|
])
|
|
|
|
def tearDownAll(self):
|
|
metadata.drop_all()
|
|
|
|
@testing.fails_on('maxdb', 'FIXME: unknown')
|
|
def test_modulo(self):
|
|
self.assertEquals(
|
|
select([flds.c.intcol % 3],
|
|
order_by=flds.c.idcol).execute().fetchall(),
|
|
[(2,),(1,)]
|
|
)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
testenv.main()
|