mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-13 12:17:28 -04:00
aee39b2847
are to work around glitchy SQLite behavior that doesnt understand "foo.id" as equivalent to "id", are now only generated in the case that those named columns are selected from (part of [ticket:513]) - MS-SQL better detects when a query is a subquery and knows not to generate ORDER BY phrases for those [ticket:513]
476 lines
21 KiB
Python
476 lines
21 KiB
Python
from testbase import PersistTest
|
|
import testbase
|
|
import unittest, sys, datetime
|
|
|
|
import sqlalchemy.databases.sqlite as sqllite
|
|
|
|
import tables
|
|
from sqlalchemy import *
|
|
from sqlalchemy.engine import ResultProxy, RowProxy
|
|
from sqlalchemy import exceptions
|
|
|
|
class QueryTest(PersistTest):
|
|
|
|
def setUpAll(self):
|
|
global users, addresses, metadata
|
|
metadata = BoundMetaData(testbase.db)
|
|
users = Table('query_users', metadata,
|
|
Column('user_id', INT, primary_key = True),
|
|
Column('user_name', VARCHAR(20)),
|
|
)
|
|
addresses = Table('query_addresses', metadata,
|
|
Column('address_id', Integer, primary_key=True),
|
|
Column('user_id', Integer, ForeignKey('query_users.user_id')),
|
|
Column('address', String(30)))
|
|
metadata.create_all()
|
|
|
|
def setUp(self):
|
|
self.users = users
|
|
def tearDown(self):
|
|
self.users.delete().execute()
|
|
|
|
def tearDownAll(self):
|
|
metadata.drop_all()
|
|
|
|
def testinsert(self):
|
|
self.users.insert().execute(user_id = 7, user_name = 'jack')
|
|
print repr(self.users.select().execute().fetchall())
|
|
|
|
def testupdate(self):
|
|
|
|
self.users.insert().execute(user_id = 7, user_name = 'jack')
|
|
print repr(self.users.select().execute().fetchall())
|
|
|
|
self.users.update(self.users.c.user_id == 7).execute(user_name = 'fred')
|
|
print repr(self.users.select().execute().fetchall())
|
|
|
|
def testrowiteration(self):
|
|
self.users.insert().execute(user_id = 7, user_name = 'jack')
|
|
self.users.insert().execute(user_id = 8, user_name = 'ed')
|
|
self.users.insert().execute(user_id = 9, user_name = 'fred')
|
|
r = self.users.select().execute()
|
|
l = []
|
|
for row in r:
|
|
l.append(row)
|
|
self.assert_(len(l) == 3)
|
|
|
|
def test_fetchmany(self):
|
|
self.users.insert().execute(user_id = 7, user_name = 'jack')
|
|
self.users.insert().execute(user_id = 8, user_name = 'ed')
|
|
self.users.insert().execute(user_id = 9, user_name = 'fred')
|
|
r = self.users.select().execute()
|
|
l = []
|
|
for row in r.fetchmany(size=2):
|
|
l.append(row)
|
|
self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l))
|
|
|
|
def test_compiled_execute(self):
|
|
s = select([self.users], self.users.c.user_id==bindparam('id')).compile()
|
|
c = testbase.db.connect()
|
|
print repr(c.execute(s, id=7).fetchall())
|
|
|
|
def test_global_metadata(self):
|
|
t1 = Table('table1', Column('col1', Integer, primary_key=True),
|
|
Column('col2', String(20)))
|
|
t2 = Table('table2', Column('col1', Integer, primary_key=True),
|
|
Column('col2', String(20)))
|
|
|
|
assert t1.c.col1
|
|
global_connect(testbase.db)
|
|
default_metadata.create_all()
|
|
try:
|
|
assert t1.count().scalar() == 0
|
|
finally:
|
|
default_metadata.drop_all()
|
|
default_metadata.clear()
|
|
|
|
@testbase.supported('postgres')
|
|
def testpassiveoverride(self):
|
|
"""primarily for postgres, tests that when we get a primary key column back
|
|
from reflecting a table which has a default value on it, we pre-execute
|
|
that PassiveDefault upon insert, even though PassiveDefault says
|
|
"let the database execute this", because in postgres we must have all the primary
|
|
key values in memory before insert; otherwise we cant locate the just inserted row."""
|
|
try:
|
|
meta = BoundMetaData(testbase.db)
|
|
testbase.db.execute("""
|
|
CREATE TABLE speedy_users
|
|
(
|
|
speedy_user_id SERIAL PRIMARY KEY,
|
|
|
|
user_name VARCHAR NOT NULL,
|
|
user_password VARCHAR NOT NULL
|
|
);
|
|
""", None)
|
|
|
|
t = Table("speedy_users", meta, autoload=True)
|
|
t.insert().execute(user_name='user', user_password='lala')
|
|
l = t.select().execute().fetchall()
|
|
self.assert_(l == [(1, 'user', 'lala')])
|
|
finally:
|
|
testbase.db.execute("drop table speedy_users", None)
|
|
|
|
@testbase.supported('postgres')
|
|
def testschema(self):
|
|
meta1 = BoundMetaData(testbase.db)
|
|
test_table = Table('my_table', meta1,
|
|
Column('id', Integer, primary_key=True),
|
|
Column('data', String(20), nullable=False),
|
|
schema='alt_schema'
|
|
)
|
|
test_table.create()
|
|
try:
|
|
# plain insert
|
|
test_table.insert().execute(data='test')
|
|
|
|
meta2 = BoundMetaData(testbase.db)
|
|
test_table = Table('my_table', meta2, autoload=True, schema='alt_schema')
|
|
test_table.insert().execute(data='test')
|
|
|
|
finally:
|
|
test_table.drop()
|
|
|
|
def test_repeated_bindparams(self):
|
|
"""test that a BindParam can be used more than once.
|
|
this should be run for dbs with both positional and named paramstyles."""
|
|
self.users.insert().execute(user_id = 7, user_name = 'jack')
|
|
self.users.insert().execute(user_id = 8, user_name = 'fred')
|
|
|
|
u = bindparam('uid')
|
|
s = self.users.select(or_(self.users.c.user_name==u, self.users.c.user_name==u))
|
|
r = s.execute(uid='fred').fetchall()
|
|
assert len(r) == 1
|
|
|
|
def test_bindparam_shortname(self):
|
|
"""test the 'shortname' field on BindParamClause."""
|
|
self.users.insert().execute(user_id = 7, user_name = 'jack')
|
|
self.users.insert().execute(user_id = 8, user_name = 'fred')
|
|
u = bindparam('uid', shortname='someshortname')
|
|
s = self.users.select(self.users.c.user_name==u)
|
|
r = s.execute(someshortname='fred').fetchall()
|
|
assert len(r) == 1
|
|
|
|
def testdelete(self):
|
|
self.users.insert().execute(user_id = 7, user_name = 'jack')
|
|
self.users.insert().execute(user_id = 8, user_name = 'fred')
|
|
print repr(self.users.select().execute().fetchall())
|
|
|
|
self.users.delete(self.users.c.user_name == 'fred').execute()
|
|
|
|
print repr(self.users.select().execute().fetchall())
|
|
|
|
def testselectlimit(self):
|
|
self.users.insert().execute(user_id=1, user_name='john')
|
|
self.users.insert().execute(user_id=2, user_name='jack')
|
|
self.users.insert().execute(user_id=3, user_name='ed')
|
|
self.users.insert().execute(user_id=4, user_name='wendy')
|
|
self.users.insert().execute(user_id=5, user_name='laura')
|
|
self.users.insert().execute(user_id=6, user_name='ralph')
|
|
self.users.insert().execute(user_id=7, user_name='fido')
|
|
r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall()
|
|
self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
|
|
|
|
@testbase.unsupported('mssql')
|
|
def testselectlimitoffset(self):
|
|
self.users.insert().execute(user_id=1, user_name='john')
|
|
self.users.insert().execute(user_id=2, user_name='jack')
|
|
self.users.insert().execute(user_id=3, user_name='ed')
|
|
self.users.insert().execute(user_id=4, user_name='wendy')
|
|
self.users.insert().execute(user_id=5, user_name='laura')
|
|
self.users.insert().execute(user_id=6, user_name='ralph')
|
|
self.users.insert().execute(user_id=7, user_name='fido')
|
|
r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
|
|
self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
|
|
r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()
|
|
self.assert_(r==[(6, 'ralph'), (7, 'fido')])
|
|
|
|
@testbase.supported('mssql')
|
|
def testselectlimitoffset_mssql(self):
|
|
try:
|
|
r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
|
|
assert False # InvalidRequestError should have been raised
|
|
except exceptions.InvalidRequestError:
|
|
pass
|
|
|
|
@testbase.unsupported('mysql')
|
|
def test_scalar_select(self):
|
|
"""test that scalar subqueries with labels get their type propigated to the result set."""
|
|
# mysql and/or mysqldb has a bug here, type isnt propigated for scalar subquery.
|
|
datetable = Table('datetable', metadata,
|
|
Column('id', Integer, primary_key=True),
|
|
Column('today', DateTime))
|
|
datetable.create()
|
|
try:
|
|
datetable.insert().execute(id=1, today=datetime.datetime(2006, 5, 12, 12, 0, 0))
|
|
s = select([datetable.alias('x').c.today], scalar=True)
|
|
s2 = select([datetable.c.id, s.label('somelabel')])
|
|
#print s2.c.somelabel.type
|
|
assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime)
|
|
finally:
|
|
datetable.drop()
|
|
|
|
def test_column_accessor(self):
|
|
self.users.insert().execute(user_id=1, user_name='john')
|
|
self.users.insert().execute(user_id=2, user_name='jack')
|
|
r = self.users.select(self.users.c.user_id==2).execute().fetchone()
|
|
self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
|
|
self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
|
|
|
|
r = text("select * from query_users where user_id=2", engine=testbase.db).execute().fetchone()
|
|
self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
|
|
self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
|
|
|
|
def test_keys(self):
|
|
self.users.insert().execute(user_id=1, user_name='foo')
|
|
r = self.users.select().execute().fetchone()
|
|
self.assertEqual(r.keys(), ['user_id', 'user_name'])
|
|
|
|
def test_items(self):
|
|
self.users.insert().execute(user_id=1, user_name='foo')
|
|
r = self.users.select().execute().fetchone()
|
|
self.assertEqual(r.items(), [('user_id', 1), ('user_name', 'foo')])
|
|
|
|
def test_len(self):
|
|
self.users.insert().execute(user_id=1, user_name='foo')
|
|
r = self.users.select().execute().fetchone()
|
|
self.assertEqual(len(r), 2)
|
|
r.close()
|
|
r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone()
|
|
self.assertEqual(len(r), 2)
|
|
r.close()
|
|
r = testbase.db.execute('select user_name from query_users', {}).fetchone()
|
|
self.assertEqual(len(r), 1)
|
|
r.close()
|
|
|
|
def test_cant_execute_join(self):
|
|
try:
|
|
users.join(addresses).execute()
|
|
except exceptions.ArgumentError, e:
|
|
assert str(e) == """Not an executeable clause: query_users JOIN query_addresses ON query_users.user_id = query_addresses.user_id"""
|
|
|
|
def test_functions(self):
|
|
x = testbase.db.func.current_date().execute().scalar()
|
|
y = testbase.db.func.current_date().select().execute().scalar()
|
|
z = testbase.db.func.current_date().scalar()
|
|
assert x == y == z
|
|
|
|
def test_conn_functions(self):
|
|
conn = testbase.db.connect()
|
|
try:
|
|
x = conn.execute(func.current_date()).scalar()
|
|
y = conn.execute(func.current_date().select()).scalar()
|
|
z = conn.scalar(func.current_date())
|
|
finally:
|
|
conn.close()
|
|
assert x == y == z
|
|
|
|
def test_update_functions(self):
|
|
"""test sending functions and SQL expressions to the VALUES and SET clauses of INSERT/UPDATE instances,
|
|
and that column-level defaults get overridden"""
|
|
meta = BoundMetaData(testbase.db)
|
|
t = Table('t1', meta,
|
|
Column('id', Integer, primary_key=True),
|
|
Column('value', Integer)
|
|
)
|
|
t2 = Table('t2', meta,
|
|
Column('id', Integer, primary_key=True),
|
|
Column('value', Integer, default="7"),
|
|
Column('stuff', String(20), onupdate="thisisstuff")
|
|
)
|
|
meta.create_all()
|
|
try:
|
|
t.insert().execute(value=func.length("one"))
|
|
assert t.select().execute().fetchone()['value'] == 3
|
|
t.update().execute(value=func.length("asfda"))
|
|
assert t.select().execute().fetchone()['value'] == 5
|
|
|
|
r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
|
|
id = r.last_inserted_ids()[0]
|
|
assert t.select(t.c.id==id).execute().fetchone()['value'] == 9
|
|
t.update(values={t.c.value:func.length("asdf")}).execute()
|
|
assert t.select().execute().fetchone()['value'] == 4
|
|
|
|
t2.insert().execute()
|
|
t2.insert().execute(value=func.length("one"))
|
|
t2.insert().execute(value=func.length("asfda") + -19, stuff="hi")
|
|
|
|
assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(7,None), (3,None), (-14,"hi")]
|
|
|
|
t2.update().execute(value=func.length("asdsafasd"), stuff="some stuff")
|
|
assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(9,"some stuff"), (9,"some stuff"), (9,"some stuff")]
|
|
|
|
t2.delete().execute()
|
|
|
|
t2.insert(values=dict(value=func.length("one") + 8)).execute()
|
|
assert t2.select().execute().fetchone()['value'] == 11
|
|
|
|
t2.update(values=dict(value=func.length("asfda"))).execute()
|
|
assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff")
|
|
|
|
t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
|
|
print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
|
|
assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
|
|
|
|
finally:
|
|
meta.drop_all()
|
|
|
|
@testbase.supported('postgres')
|
|
def test_functions_with_cols(self):
|
|
x = testbase.db.func.current_date().execute().scalar()
|
|
y = testbase.db.func.current_date().select().execute().scalar()
|
|
z = testbase.db.func.current_date().scalar()
|
|
w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar()
|
|
|
|
# construct a column-based FROM object out of a function, like in [ticket:172]
|
|
s = select([column('date', type=DateTime)], from_obj=[testbase.db.func.current_date()])
|
|
q = s.execute().fetchone()[s.c.date]
|
|
r = s.alias('datequery').select().scalar()
|
|
|
|
assert x == y == z == w == q == r
|
|
|
|
def test_column_order_with_simple_query(self):
|
|
# should return values in column definition order
|
|
self.users.insert().execute(user_id=1, user_name='foo')
|
|
r = self.users.select(self.users.c.user_id==1).execute().fetchone()
|
|
self.assertEqual(r[0], 1)
|
|
self.assertEqual(r[1], 'foo')
|
|
self.assertEqual(r.keys(), ['user_id', 'user_name'])
|
|
self.assertEqual(r.values(), [1, 'foo'])
|
|
|
|
def test_column_order_with_text_query(self):
|
|
# should return values in query order
|
|
self.users.insert().execute(user_id=1, user_name='foo')
|
|
r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone()
|
|
self.assertEqual(r[0], 'foo')
|
|
self.assertEqual(r[1], 1)
|
|
self.assertEqual(r.keys(), ['user_name', 'user_id'])
|
|
self.assertEqual(r.values(), ['foo', 1])
|
|
|
|
@testbase.unsupported('oracle', 'firebird')
|
|
def test_column_accessor_shadow(self):
|
|
meta = BoundMetaData(testbase.db)
|
|
shadowed = Table('test_shadowed', meta,
|
|
Column('shadow_id', INT, primary_key = True),
|
|
Column('shadow_name', VARCHAR(20)),
|
|
Column('parent', VARCHAR(20)),
|
|
Column('row', VARCHAR(40)),
|
|
Column('__parent', VARCHAR(20)),
|
|
Column('__row', VARCHAR(20)),
|
|
)
|
|
shadowed.create(checkfirst=True)
|
|
try:
|
|
shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
|
|
r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
|
|
self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
|
|
self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
|
|
self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
|
|
self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
|
|
self.assert_(r['__parent'] == 'Hidden parent')
|
|
self.assert_(r['__row'] == 'Hidden row')
|
|
try:
|
|
print r.__parent, r.__row
|
|
self.fail('Should not allow access to private attributes')
|
|
except AttributeError:
|
|
pass # expected
|
|
r.close()
|
|
finally:
|
|
shadowed.drop(checkfirst=True)
|
|
|
|
|
|
class CompoundTest(PersistTest):
|
|
"""test compound statements like UNION, INTERSECT, particularly their ability to nest on
|
|
different databases."""
|
|
def setUpAll(self):
|
|
global metadata, t1, t2, t3
|
|
metadata = BoundMetaData(testbase.db)
|
|
t1 = Table('t1', metadata,
|
|
Column('col1', Integer, primary_key=True),
|
|
Column('col2', String(30)),
|
|
Column('col3', String(40)),
|
|
Column('col4', String(30))
|
|
)
|
|
t2 = Table('t2', metadata,
|
|
Column('col1', Integer, primary_key=True),
|
|
Column('col2', String(30)),
|
|
Column('col3', String(40)),
|
|
Column('col4', String(30)))
|
|
t3 = Table('t3', metadata,
|
|
Column('col1', Integer, primary_key=True),
|
|
Column('col2', String(30)),
|
|
Column('col3', String(40)),
|
|
Column('col4', String(30)))
|
|
metadata.create_all()
|
|
|
|
t1.insert().execute([
|
|
dict(col2="t1col2r1", col3="aaa", col4="aaa"),
|
|
dict(col2="t1col2r2", col3="bbb", col4="bbb"),
|
|
dict(col2="t1col2r3", col3="ccc", col4="ccc"),
|
|
])
|
|
t2.insert().execute([
|
|
dict(col2="t2col2r1", col3="aaa", col4="bbb"),
|
|
dict(col2="t2col2r2", col3="bbb", col4="ccc"),
|
|
dict(col2="t2col2r3", col3="ccc", col4="aaa"),
|
|
])
|
|
t3.insert().execute([
|
|
dict(col2="t3col2r1", col3="aaa", col4="ccc"),
|
|
dict(col2="t3col2r2", col3="bbb", col4="aaa"),
|
|
dict(col2="t3col2r3", col3="ccc", col4="bbb"),
|
|
])
|
|
|
|
def tearDownAll(self):
|
|
metadata.drop_all()
|
|
|
|
def test_union(self):
|
|
(s1, s2) = (
|
|
select([t1.c.col3, t1.c.col4], t1.c.col2.in_("t1col2r1", "t1col2r2")),
|
|
select([t2.c.col3, t2.c.col4], t2.c.col2.in_("t2col2r2", "t2col2r3"))
|
|
)
|
|
u = union(s1, s2)
|
|
assert u.execute().fetchall() == [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
|
|
assert u.alias('bar').select().execute().fetchall() == [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
|
|
|
|
@testbase.unsupported('mysql')
|
|
def test_intersect(self):
|
|
i = intersect(
|
|
select([t2.c.col3, t2.c.col4]),
|
|
select([t2.c.col3, t2.c.col4], t2.c.col4==t3.c.col3)
|
|
)
|
|
assert i.execute().fetchall() == [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
|
|
assert i.alias('bar').select().execute().fetchall() == [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
|
|
|
|
@testbase.unsupported('mysql')
|
|
def test_except_style1(self):
|
|
e = except_(union(
|
|
select([t1.c.col3, t1.c.col4]),
|
|
select([t2.c.col3, t2.c.col4]),
|
|
select([t3.c.col3, t3.c.col4]),
|
|
parens=True), select([t2.c.col3, t2.c.col4]))
|
|
assert e.alias('bar').select().execute().fetchall() == [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')]
|
|
|
|
@testbase.unsupported('mysql')
|
|
def test_except_style2(self):
|
|
e = except_(union(
|
|
select([t1.c.col3, t1.c.col4]),
|
|
select([t2.c.col3, t2.c.col4]),
|
|
select([t3.c.col3, t3.c.col4]),
|
|
).alias('foo').select(), select([t2.c.col3, t2.c.col4]))
|
|
assert e.execute().fetchall() == [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')]
|
|
assert e.alias('bar').select().execute().fetchall() == [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')]
|
|
|
|
@testbase.unsupported('mysql')
|
|
def test_composite(self):
|
|
u = intersect(
|
|
select([t2.c.col3, t2.c.col4]),
|
|
union(
|
|
select([t1.c.col3, t1.c.col4]),
|
|
select([t2.c.col3, t2.c.col4]),
|
|
select([t3.c.col3, t3.c.col4]),
|
|
).alias('foo').select()
|
|
)
|
|
assert u.execute().fetchall() == [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
|
|
assert u.alias('foo').select().execute().fetchall() == [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
testbase.main()
|