mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-14 12:47:22 -04:00
364 lines
17 KiB
Python
364 lines
17 KiB
Python
|
|
import sqlalchemy.ansisql as ansisql
|
|
import sqlalchemy.databases.postgres as postgres
|
|
import sqlalchemy.databases.oracle as oracle
|
|
|
|
db = ansisql.engine()
|
|
|
|
from sqlalchemy.sql import *
|
|
from sqlalchemy.schema import *
|
|
|
|
from testbase import PersistTest
|
|
import unittest, re
|
|
|
|
|
|
table = Table('mytable', db,
|
|
Column('myid', 3, key = 'id'),
|
|
Column('name', 4, key = 'name'),
|
|
Column('description', 4, key = 'description'),
|
|
)
|
|
|
|
table2 = Table(
|
|
'myothertable', db,
|
|
Column('otherid',3, key='id'),
|
|
Column('othername', 4, key='name'),
|
|
)
|
|
|
|
table3 = Table(
|
|
'thirdtable', db,
|
|
Column('userid', 5, key='id'),
|
|
Column('otherstuff', 5),
|
|
)
|
|
|
|
class SQLTest(PersistTest):
|
|
def runtest(self, clause, result, engine = None, params = None):
|
|
c = clause.compile(engine, params)
|
|
print "\n" + str(c) + repr(c.get_params())
|
|
cc = re.sub(r'\n', '', str(c))
|
|
self.assert_(cc == result)
|
|
|
|
class SelectTest(SQLTest):
|
|
|
|
def testtext(self):
|
|
self.runtest(
|
|
text("select * from foo where lala = bar") ,
|
|
"select * from foo where lala = bar",
|
|
engine = db
|
|
)
|
|
|
|
def testtableselect(self):
|
|
self.runtest(table.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable")
|
|
|
|
self.runtest(select([table, table2]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \
|
|
myothertable.othername FROM mytable, myothertable")
|
|
|
|
def testsubquery(self):
|
|
s = select([table], table.c.name == 'jack')
|
|
self.runtest(
|
|
select(
|
|
[s],
|
|
s.c.id == 7
|
|
)
|
|
,
|
|
"SELECT myid, name, description FROM (SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name = :mytable_name) WHERE myid = :myid")
|
|
|
|
sq = Select([table])
|
|
self.runtest(
|
|
sq.select(),
|
|
"SELECT myid, name, description FROM (SELECT mytable.myid, mytable.name, mytable.description FROM mytable)"
|
|
)
|
|
|
|
sq = subquery(
|
|
'sq',
|
|
[table],
|
|
)
|
|
|
|
self.runtest(
|
|
sq.select(sq.c.id == 7),
|
|
"SELECT sq.myid, sq.name, sq.description FROM \
|
|
(SELECT mytable.myid, mytable.name, mytable.description FROM mytable) sq WHERE sq.myid = :sq_myid"
|
|
)
|
|
|
|
sq = subquery(
|
|
'sq',
|
|
[table, table2],
|
|
and_(table.c.id ==7, table2.c.id==table.c.id),
|
|
use_labels = True
|
|
)
|
|
|
|
sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \
|
|
mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \
|
|
myothertable.othername AS myothertable_othername FROM mytable, myothertable \
|
|
WHERE mytable.myid = :mytable_myid AND myothertable.otherid = mytable.myid"
|
|
|
|
self.runtest(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \
|
|
sq.myothertable_othername FROM (" + sqstring + ") sq")
|
|
|
|
sq2 = subquery(
|
|
'sq2',
|
|
[sq],
|
|
use_labels = True
|
|
)
|
|
|
|
self.runtest(sq2.select(), "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, sq2.sq_mytable_description, \
|
|
sq2.sq_myothertable_otherid, sq2.sq_myothertable_othername FROM \
|
|
(SELECT sq.mytable_myid AS sq_mytable_myid, sq.mytable_name AS sq_mytable_name, \
|
|
sq.mytable_description AS sq_mytable_description, sq.myothertable_otherid AS sq_myothertable_otherid, \
|
|
sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") sq) sq2")
|
|
|
|
|
|
def testand(self):
|
|
self.runtest(
|
|
select(['*'], and_(table.c.id == 12, table.c.name=='asdf', table2.c.name == 'foo', "sysdate() = today()")),
|
|
"SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name AND myothertable.othername = :myothertable_othername AND sysdate() = today()"
|
|
)
|
|
|
|
def testor(self):
|
|
self.runtest(
|
|
select([table], and_(
|
|
table.c.id == 12,
|
|
or_(table2.c.name=='asdf', table2.c.name == 'foo', table2.c.id == 9),
|
|
"sysdate() = today()",
|
|
)),
|
|
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND (myothertable.othername = :myothertable_othername OR myothertable.othername = :myothertable_othername_1 OR myothertable.otherid = :myothertable_otherid) AND sysdate() = today()"
|
|
)
|
|
|
|
|
|
def testmultiparam(self):
|
|
self.runtest(
|
|
select(["*"], or_(table.c.id == 12, table.c.id=='asdf', table.c.id == 'foo')),
|
|
"SELECT * FROM mytable WHERE mytable.myid = :mytable_myid OR mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2"
|
|
)
|
|
|
|
def testorderby(self):
|
|
self.runtest(
|
|
table2.select(order_by = [table2.c.id, asc(table2.c.name)]),
|
|
"SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
|
|
)
|
|
def testalias(self):
|
|
# test the alias for a table. column names stay the same, table name "changes" to "foo".
|
|
self.runtest(
|
|
select([alias(table, 'foo')])
|
|
,"SELECT foo.myid, foo.name, foo.description FROM mytable foo")
|
|
|
|
# create a select for a join of two tables. use_labels means the column names will have
|
|
# labels tablename_columnname, which become the column keys accessible off the Selectable object.
|
|
# also, only use one column from the second table and all columns from the first table.
|
|
q = select([table, table2.c.id], table.c.id == table2.c.id, use_labels = True)
|
|
|
|
# make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view".
|
|
a = alias(q, 't2view')
|
|
|
|
# select from that alias, also using labels. two levels of labels should produce two underscores.
|
|
# also, reference the column "mytable_myid" off of the t2view alias.
|
|
self.runtest(
|
|
a.select(a.c.mytable_myid == 9, use_labels = True),
|
|
"SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \
|
|
t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \
|
|
(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \
|
|
myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
|
|
WHERE mytable.myid = myothertable.otherid) t2view WHERE t2view.mytable_myid = :t2view_mytable_myid"
|
|
)
|
|
|
|
def testliteral(self):
|
|
self.runtest(select(
|
|
["foobar(a)", "pk_foo_bar(syslaal)"],
|
|
"a = 12",
|
|
from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"],
|
|
engine = db
|
|
),
|
|
"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
|
|
|
|
def testliteralmix(self):
|
|
self.runtest(select(
|
|
[table, table2.c.id, "sysdate()", "foo, bar, lala"],
|
|
and_(
|
|
"foo.id = foofoo(lala)",
|
|
"datetime(foo) = Today",
|
|
table.c.id == table2.c.id,
|
|
)
|
|
),
|
|
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \
|
|
FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid")
|
|
|
|
def testliteralsubquery(self):
|
|
self.runtest(select(
|
|
[alias(table, 't'), "foo.f"],
|
|
"foo.f = t.id",
|
|
from_obj = ["(select f from bar where lala=heyhey) foo"]
|
|
),
|
|
"SELECT t.myid, t.name, t.description, foo.f FROM mytable t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
|
|
|
|
def testjoin(self):
|
|
self.runtest(
|
|
join(table2, table, table.c.id == table2.c.id).select(),
|
|
"SELECT myothertable.otherid, myothertable.othername, mytable.myid, mytable.name, mytable.description \
|
|
FROM myothertable, mytable WHERE mytable.myid = myothertable.otherid"
|
|
)
|
|
|
|
self.runtest(
|
|
select(
|
|
[table],
|
|
from_obj = [join(table, table2, table.c.id == table2.c.id)]
|
|
),
|
|
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
|
|
|
|
self.runtest(
|
|
select(
|
|
[join(join(table, table2, table.c.id == table2.c.id), table3, table.c.id == table3.c.id)
|
|
]),
|
|
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid"
|
|
)
|
|
|
|
def testmultijoin(self):
|
|
self.runtest(
|
|
select([table, table2, table3],
|
|
from_obj = [outerjoin(join(table, table2, table.c.id == table2.c.id), table3, table.c.id==table3.c.id)]
|
|
)
|
|
,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid = thirdtable.userid"
|
|
)
|
|
|
|
def testunion(self):
|
|
x = union(
|
|
select([table], table.c.id == 5),
|
|
select([table], table.c.id == 12),
|
|
order_by = [table.c.id],
|
|
)
|
|
|
|
self.runtest(x, "SELECT mytable.myid, mytable.name, mytable.description \
|
|
FROM mytable WHERE mytable.myid = :mytable_myid UNION \
|
|
SELECT mytable.myid, mytable.name, mytable.description \
|
|
FROM mytable WHERE mytable.myid = :mytable_myid_1 ORDER BY mytable.myid")
|
|
|
|
self.runtest(
|
|
union(
|
|
select([table]),
|
|
select([table2]),
|
|
select([table3])
|
|
)
|
|
,
|
|
"SELECT mytable.myid, mytable.name, mytable.description \
|
|
FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \
|
|
FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable")
|
|
|
|
|
|
def testouterjoin(self):
|
|
# test an outer join. the oracle module should take the ON clause of the join and
|
|
# move it up to the WHERE clause of its parent select, and append (+) to all right-hand-side columns
|
|
# within the original onclause, but leave right-hand-side columns unchanged outside of the onclause
|
|
# parameters.
|
|
|
|
query = select(
|
|
[table, table2],
|
|
and_(
|
|
table.c.name == 'fred',
|
|
table.c.id == 10,
|
|
table2.c.name != 'jack',
|
|
"EXISTS (select yay from foo where boo = lar)"
|
|
),
|
|
from_obj = [ outerjoin(table, table2, table.c.id == table2.c.id) ]
|
|
)
|
|
|
|
self.runtest(query,
|
|
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
|
|
FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \
|
|
WHERE mytable.name = :mytable_name AND mytable.myid = :mytable_myid AND \
|
|
myothertable.othername != :myothertable_othername AND \
|
|
EXISTS (select yay from foo where boo = lar)",
|
|
engine = postgres.engine())
|
|
|
|
|
|
# self.runtest(query,
|
|
# "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
|
|
#FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \
|
|
#mytable.name = :mytable_name AND mytable.myid = :mytable_myid AND \
|
|
#myothertable.othername != :myothertable_othername AND EXISTS (select yay from foo where boo = lar)",
|
|
# engine = oracle.engine(use_ansi = False))
|
|
|
|
def testbindparam(self):
|
|
self.runtest(select(
|
|
[table, table2],
|
|
and_(table.c.id == table2.c.id,
|
|
table.c.name == bindparam('mytablename'),
|
|
)
|
|
),
|
|
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
|
|
FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid AND mytable.name = :mytablename"
|
|
)
|
|
|
|
# check that the bind params sent along with a compile() call
|
|
# get preserved when the params are retreived later
|
|
s = select([table], table.c.id == bindparam('test'))
|
|
c = s.compile(bindparams = {'test' : 7})
|
|
self.assert_(c.get_params() == {'test' : 7})
|
|
|
|
def testcorrelatedsubquery(self):
|
|
self.runtest(
|
|
select([table], table.c.id == select([table2.c.id], table.c.name == table2.c.name)),
|
|
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT myothertable.otherid FROM myothertable WHERE mytable.name = myothertable.othername)"
|
|
)
|
|
|
|
self.runtest(
|
|
select([table], exists([1], table2.c.id == table.c.id)),
|
|
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)"
|
|
)
|
|
|
|
s = subquery('sq2', [table], exists([1], table2.c.id == table.c.id))
|
|
self.runtest(
|
|
select([s, table])
|
|
,"SELECT sq2.myid, sq2.name, sq2.description, mytable.myid, mytable.name, mytable.description FROM (SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)) sq2, mytable")
|
|
|
|
def testin(self):
|
|
self.runtest(select([table], table.c.id.in_(1, 2, 3)),
|
|
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (1, 2, 3)")
|
|
|
|
self.runtest(select([table], table.c.id.in_(select([table2.c.id]))),
|
|
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (SELECT myothertable.otherid FROM myothertable)")
|
|
|
|
class CRUDTest(SQLTest):
|
|
def testinsert(self):
|
|
# generic insert, will create bind params for all columns
|
|
self.runtest(insert(table), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)")
|
|
|
|
# insert with user-supplied bind params for specific columns,
|
|
# cols provided literally
|
|
self.runtest(
|
|
insert(table, {table.c.id : bindparam('userid'), table.c.name : bindparam('username')}),
|
|
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
|
|
|
|
# insert with user-supplied bind params for specific columns, cols
|
|
# provided as strings
|
|
self.runtest(
|
|
insert(table, dict(id = 3, name = 'jack')),
|
|
"INSERT INTO mytable (myid, name) VALUES (:myid, :name)"
|
|
)
|
|
|
|
# insert with a subselect provided
|
|
#self.runtest(
|
|
# insert(table, select([table2])),
|
|
# ""
|
|
#)
|
|
|
|
def testupdate(self):
|
|
self.runtest(update(table, table.c.id == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {table.c.name:'fred'})
|
|
self.runtest(update(table, table.c.id == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {'name':'fred'})
|
|
self.runtest(update(table, values = {table.c.name : table.c.id}), "UPDATE mytable SET name=mytable.myid")
|
|
self.runtest(update(table, table.c.id == 12, values = {table.c.name : table.c.id}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'})
|
|
|
|
def testcorrelatedupdate(self):
|
|
# test against a straight text subquery
|
|
u = update(table, values = {table.c.name : text("select name from mytable where id=mytable.id")})
|
|
self.runtest(u, "UPDATE mytable SET name=(select name from mytable where id=mytable.id)")
|
|
|
|
# test against a regular constructed subquery
|
|
s = select([table2], table2.c.id == table.c.id)
|
|
u = update(table, table.c.name == 'jack', values = {table.c.name : s})
|
|
self.runtest(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :mytable_name")
|
|
|
|
def testdelete(self):
|
|
self.runtest(delete(table, table.c.id == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|