mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-07 01:10:52 -04:00
cf62005639
one particular vector test wont run on oracle 23c free, so just disable it. added better skips for the rest of the vector tests and fixed a deprecation issue. this will be the first run on the new oracle23 on CI so we'll have to see how this goes. Also adjust for mariabdb12 being overly helpful with regards to stale row updates. as we are having trouble getting 23c to pass throug transaction tests, i noted we have an explosion of tests due to the multiple drivers, so this patch introduces __sparse_driver_backend__ for all tests where we want variety of database server but there's no need to test every driver. This should dramatically reduce the size of the test suite run Change-Id: Ic8d3eb0a60e76b4c54c6bb4a721f90c81ede782b
1806 lines
57 KiB
Python
1806 lines
57 KiB
Python
from sqlalchemy import and_
|
|
from sqlalchemy import asc
|
|
from sqlalchemy import bindparam
|
|
from sqlalchemy import cast
|
|
from sqlalchemy import desc
|
|
from sqlalchemy import exc
|
|
from sqlalchemy import except_
|
|
from sqlalchemy import ForeignKey
|
|
from sqlalchemy import func
|
|
from sqlalchemy import INT
|
|
from sqlalchemy import Integer
|
|
from sqlalchemy import intersect
|
|
from sqlalchemy import literal
|
|
from sqlalchemy import literal_column
|
|
from sqlalchemy import not_
|
|
from sqlalchemy import or_
|
|
from sqlalchemy import select
|
|
from sqlalchemy import sql
|
|
from sqlalchemy import String
|
|
from sqlalchemy import testing
|
|
from sqlalchemy import text
|
|
from sqlalchemy import tuple_
|
|
from sqlalchemy import TypeDecorator
|
|
from sqlalchemy import union
|
|
from sqlalchemy import union_all
|
|
from sqlalchemy import VARCHAR
|
|
from sqlalchemy.engine import default
|
|
from sqlalchemy.sql import LABEL_STYLE_TABLENAME_PLUS_COL
|
|
from sqlalchemy.sql.selectable import LABEL_STYLE_NONE
|
|
from sqlalchemy.testing import assert_raises_message
|
|
from sqlalchemy.testing import eq_
|
|
from sqlalchemy.testing import fixtures
|
|
from sqlalchemy.testing import is_
|
|
from sqlalchemy.testing.schema import Column
|
|
from sqlalchemy.testing.schema import Table
|
|
from sqlalchemy.testing.util import resolve_lambda
|
|
|
|
|
|
class QueryTest(fixtures.TablesTest):
|
|
__sparse_driver_backend__ = True
|
|
|
|
@classmethod
|
|
def define_tables(cls, metadata):
|
|
Table(
|
|
"users",
|
|
metadata,
|
|
Column(
|
|
"user_id", INT, primary_key=True, test_needs_autoincrement=True
|
|
),
|
|
Column("user_name", VARCHAR(20)),
|
|
test_needs_acid=True,
|
|
)
|
|
Table(
|
|
"addresses",
|
|
metadata,
|
|
Column(
|
|
"address_id",
|
|
Integer,
|
|
primary_key=True,
|
|
test_needs_autoincrement=True,
|
|
),
|
|
Column("user_id", Integer, ForeignKey("users.user_id")),
|
|
Column("address", String(30)),
|
|
test_needs_acid=True,
|
|
)
|
|
|
|
Table(
|
|
"u2",
|
|
metadata,
|
|
Column("user_id", INT, primary_key=True),
|
|
Column("user_name", VARCHAR(20)),
|
|
test_needs_acid=True,
|
|
)
|
|
|
|
def test_order_by_label(self, connection):
|
|
"""test that a label within an ORDER BY works on each backend.
|
|
|
|
This test should be modified to support [ticket:1068] when that ticket
|
|
is implemented. For now, you need to put the actual string in the
|
|
ORDER BY.
|
|
|
|
"""
|
|
|
|
users = self.tables.users
|
|
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
{"user_id": 7, "user_name": "jack"},
|
|
{"user_id": 8, "user_name": "ed"},
|
|
{"user_id": 9, "user_name": "fred"},
|
|
],
|
|
)
|
|
|
|
concat = ("test: " + users.c.user_name).label("thedata")
|
|
eq_(
|
|
connection.execute(select(concat).order_by("thedata")).fetchall(),
|
|
[("test: ed",), ("test: fred",), ("test: jack",)],
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(select(concat).order_by("thedata")).fetchall(),
|
|
[("test: ed",), ("test: fred",), ("test: jack",)],
|
|
)
|
|
|
|
concat = ("test: " + users.c.user_name).label("thedata")
|
|
eq_(
|
|
connection.execute(
|
|
select(concat).order_by(desc("thedata"))
|
|
).fetchall(),
|
|
[("test: jack",), ("test: fred",), ("test: ed",)],
|
|
)
|
|
|
|
@testing.requires.order_by_label_with_expression
|
|
def test_order_by_label_compound(self, connection):
|
|
users = self.tables.users
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
{"user_id": 7, "user_name": "jack"},
|
|
{"user_id": 8, "user_name": "ed"},
|
|
{"user_id": 9, "user_name": "fred"},
|
|
],
|
|
)
|
|
|
|
concat = ("test: " + users.c.user_name).label("thedata")
|
|
eq_(
|
|
connection.execute(
|
|
select(concat).order_by(literal_column("thedata") + "x")
|
|
).fetchall(),
|
|
[("test: ed",), ("test: fred",), ("test: jack",)],
|
|
)
|
|
|
|
@testing.requires.boolean_col_expressions
|
|
def test_or_and_as_columns(self, connection):
|
|
true, false = literal(True), literal(False)
|
|
|
|
eq_(connection.execute(select(and_(true, false))).scalar(), False)
|
|
eq_(connection.execute(select(and_(true, true))).scalar(), True)
|
|
eq_(connection.execute(select(or_(true, false))).scalar(), True)
|
|
eq_(connection.execute(select(or_(false, false))).scalar(), False)
|
|
eq_(
|
|
connection.execute(select(not_(or_(false, false)))).scalar(),
|
|
True,
|
|
)
|
|
|
|
row = connection.execute(
|
|
select(or_(false, false).label("x"), and_(true, false).label("y"))
|
|
).first()
|
|
assert row.x == False # noqa
|
|
assert row.y == False # noqa
|
|
|
|
row = connection.execute(
|
|
select(or_(true, false).label("x"), and_(true, false).label("y"))
|
|
).first()
|
|
assert row.x == True # noqa
|
|
assert row.y == False # noqa
|
|
|
|
def test_select_tuple(self, connection):
|
|
users = self.tables.users
|
|
connection.execute(
|
|
users.insert(),
|
|
{"user_id": 1, "user_name": "apples"},
|
|
)
|
|
|
|
assert_raises_message(
|
|
exc.CompileError,
|
|
r"Most backends don't support SELECTing from a tuple\(\) object.",
|
|
connection.execute,
|
|
select(tuple_(users.c.user_id, users.c.user_name)),
|
|
)
|
|
|
|
@testing.combinations(
|
|
(
|
|
lambda users: select(users.c.user_id).where(
|
|
users.c.user_name.startswith("apple")
|
|
),
|
|
[(1,)],
|
|
),
|
|
(
|
|
lambda users: select(users.c.user_id).where(
|
|
users.c.user_name.contains("i % t")
|
|
),
|
|
[(5,)],
|
|
),
|
|
(
|
|
lambda users: select(users.c.user_id).where(
|
|
users.c.user_name.endswith("anas")
|
|
),
|
|
[(3,)],
|
|
),
|
|
(
|
|
lambda users: select(users.c.user_id).where(
|
|
users.c.user_name.contains("i % t", escape="&")
|
|
),
|
|
[(5,)],
|
|
),
|
|
argnames="expr,result",
|
|
)
|
|
def test_like_ops(self, connection, expr, result):
|
|
users = self.tables.users
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
{"user_id": 1, "user_name": "apples"},
|
|
{"user_id": 2, "user_name": "oranges"},
|
|
{"user_id": 3, "user_name": "bananas"},
|
|
{"user_id": 4, "user_name": "legumes"},
|
|
{"user_id": 5, "user_name": "hi % there"},
|
|
],
|
|
)
|
|
|
|
expr = resolve_lambda(expr, users=users)
|
|
eq_(connection.execute(expr).fetchall(), result)
|
|
|
|
@testing.requires.mod_operator_as_percent_sign
|
|
@testing.emits_warning(".*now automatically escapes.*")
|
|
def test_percents_in_text(self, connection):
|
|
for expr, result in (
|
|
(text("select 6 % 10"), 6),
|
|
(text("select 17 % 10"), 7),
|
|
(text("select '%'"), "%"),
|
|
(text("select '%%'"), "%%"),
|
|
(text("select '%%%'"), "%%%"),
|
|
(text("select 'hello % world'"), "hello % world"),
|
|
):
|
|
eq_(connection.scalar(expr), result)
|
|
|
|
def test_ilike(self, connection):
|
|
users = self.tables.users
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
{"user_id": 1, "user_name": "one"},
|
|
{"user_id": 2, "user_name": "TwO"},
|
|
{"user_id": 3, "user_name": "ONE"},
|
|
{"user_id": 4, "user_name": "OnE"},
|
|
],
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(
|
|
select(users.c.user_id).where(users.c.user_name.ilike("one"))
|
|
).fetchall(),
|
|
[(1,), (3,), (4,)],
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(
|
|
select(users.c.user_id).where(users.c.user_name.ilike("TWO"))
|
|
).fetchall(),
|
|
[(2,)],
|
|
)
|
|
|
|
if testing.against("postgresql"):
|
|
eq_(
|
|
connection.execute(
|
|
select(users.c.user_id).where(
|
|
users.c.user_name.like("one")
|
|
)
|
|
).fetchall(),
|
|
[(1,)],
|
|
)
|
|
eq_(
|
|
connection.execute(
|
|
select(users.c.user_id).where(
|
|
users.c.user_name.like("TWO")
|
|
)
|
|
).fetchall(),
|
|
[],
|
|
)
|
|
|
|
def test_repeated_bindparams(self, connection):
|
|
"""Tests that a BindParam can be used more than once.
|
|
|
|
This should be run for DB-APIs with both positional and named
|
|
paramstyles.
|
|
"""
|
|
users = self.tables.users
|
|
|
|
connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
|
|
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
|
|
|
|
u = bindparam("userid")
|
|
s = users.select().where(
|
|
and_(users.c.user_name == u, users.c.user_name == u)
|
|
)
|
|
r = connection.execute(s, dict(userid="fred")).fetchall()
|
|
assert len(r) == 1
|
|
|
|
def test_bindparam_detection(self):
|
|
dialect = default.DefaultDialect(paramstyle="qmark")
|
|
|
|
def prep(q):
|
|
return str(sql.text(q).compile(dialect=dialect))
|
|
|
|
def a_eq(got, wanted):
|
|
if got != wanted:
|
|
print("Wanted %s" % wanted)
|
|
print("Received %s" % got)
|
|
self.assert_(got == wanted, got)
|
|
|
|
a_eq(prep("select foo"), "select foo")
|
|
a_eq(prep("time='12:30:00'"), "time='12:30:00'")
|
|
a_eq(prep("time='12:30:00'"), "time='12:30:00'")
|
|
a_eq(prep(":this:that"), ":this:that")
|
|
a_eq(prep(":this :that"), "? ?")
|
|
a_eq(prep("(:this),(:that :other)"), "(?),(? ?)")
|
|
a_eq(prep("(:this),(:that:other)"), "(?),(:that:other)")
|
|
a_eq(prep("(:this),(:that,:other)"), "(?),(?,?)")
|
|
a_eq(prep("(:that_:other)"), "(:that_:other)")
|
|
a_eq(prep("(:that_ :other)"), "(? ?)")
|
|
a_eq(prep("(:that_other)"), "(?)")
|
|
a_eq(prep("(:that$other)"), "(?)")
|
|
a_eq(prep("(:that$:other)"), "(:that$:other)")
|
|
a_eq(prep(".:that$ :other."), ".? ?.")
|
|
|
|
a_eq(prep(r"select \foo"), r"select \foo")
|
|
a_eq(prep(r"time='12\:30:00'"), r"time='12\:30:00'")
|
|
a_eq(prep(r":this \:that"), "? :that")
|
|
a_eq(prep(r"(\:that$other)"), "(:that$other)")
|
|
a_eq(prep(r".\:that$ :other."), ".:that$ ?.")
|
|
|
|
@testing.requires.standalone_binds
|
|
def test_select_from_bindparam(self, connection):
|
|
"""Test result row processing when selecting from a plain bind
|
|
param."""
|
|
|
|
class MyInteger(TypeDecorator):
|
|
impl = Integer
|
|
cache_ok = True
|
|
|
|
def process_bind_param(self, value, dialect):
|
|
return int(value[4:])
|
|
|
|
def process_result_value(self, value, dialect):
|
|
return "INT_%d" % value
|
|
|
|
eq_(
|
|
connection.scalar(select(cast("INT_5", type_=MyInteger))),
|
|
"INT_5",
|
|
)
|
|
eq_(
|
|
connection.scalar(
|
|
select(cast("INT_5", type_=MyInteger).label("foo"))
|
|
),
|
|
"INT_5",
|
|
)
|
|
|
|
def test_order_by(self, connection):
|
|
"""Exercises ORDER BY clause generation.
|
|
|
|
Tests simple, compound, aliased and DESC clauses.
|
|
"""
|
|
|
|
users = self.tables.users
|
|
|
|
connection.execute(users.insert(), dict(user_id=1, user_name="c"))
|
|
connection.execute(users.insert(), dict(user_id=2, user_name="b"))
|
|
connection.execute(users.insert(), dict(user_id=3, user_name="a"))
|
|
|
|
def a_eq(executable, wanted):
|
|
got = list(connection.execute(executable))
|
|
eq_(got, wanted)
|
|
|
|
for labels in False, True:
|
|
label_style = (
|
|
LABEL_STYLE_NONE
|
|
if labels is False
|
|
else LABEL_STYLE_TABLENAME_PLUS_COL
|
|
)
|
|
|
|
def go(stmt):
|
|
if labels:
|
|
stmt = stmt.set_label_style(label_style)
|
|
return stmt
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(users.c.user_id)
|
|
.set_label_style(label_style),
|
|
[(1, "c"), (2, "b"), (3, "a")],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(users.c.user_name, users.c.user_id)
|
|
.set_label_style(label_style),
|
|
[(3, "a"), (2, "b"), (1, "c")],
|
|
)
|
|
|
|
a_eq(
|
|
go(
|
|
select(users.c.user_id.label("foo")).order_by(
|
|
users.c.user_id
|
|
)
|
|
),
|
|
[(1,), (2,), (3,)],
|
|
)
|
|
|
|
a_eq(
|
|
go(
|
|
select(
|
|
users.c.user_id.label("foo"), users.c.user_name
|
|
).order_by(users.c.user_name, users.c.user_id),
|
|
),
|
|
[(3, "a"), (2, "b"), (1, "c")],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.distinct()
|
|
.order_by(users.c.user_id)
|
|
.set_label_style(label_style),
|
|
[(1, "c"), (2, "b"), (3, "a")],
|
|
)
|
|
|
|
a_eq(
|
|
go(
|
|
select(users.c.user_id.label("foo"))
|
|
.distinct()
|
|
.order_by(users.c.user_id),
|
|
),
|
|
[(1,), (2,), (3,)],
|
|
)
|
|
|
|
a_eq(
|
|
go(
|
|
select(
|
|
users.c.user_id.label("a"),
|
|
users.c.user_id.label("b"),
|
|
users.c.user_name,
|
|
).order_by(users.c.user_id),
|
|
),
|
|
[(1, 1, "c"), (2, 2, "b"), (3, 3, "a")],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.distinct()
|
|
.order_by(desc(users.c.user_id))
|
|
.set_label_style(label_style),
|
|
[(3, "a"), (2, "b"), (1, "c")],
|
|
)
|
|
|
|
a_eq(
|
|
go(
|
|
select(users.c.user_id.label("foo"))
|
|
.distinct()
|
|
.order_by(users.c.user_id.desc()),
|
|
),
|
|
[(3,), (2,), (1,)],
|
|
)
|
|
|
|
@testing.requires.nullsordering
|
|
def test_order_by_nulls(self, connection):
|
|
"""Exercises ORDER BY clause generation.
|
|
|
|
Tests simple, compound, aliased and DESC clauses.
|
|
"""
|
|
|
|
users = self.tables.users
|
|
|
|
connection.execute(users.insert(), dict(user_id=1))
|
|
connection.execute(users.insert(), dict(user_id=2, user_name="b"))
|
|
connection.execute(users.insert(), dict(user_id=3, user_name="a"))
|
|
|
|
def a_eq(executable, wanted):
|
|
got = list(connection.execute(executable))
|
|
eq_(got, wanted)
|
|
|
|
for labels in False, True:
|
|
label_style = (
|
|
LABEL_STYLE_NONE
|
|
if labels is False
|
|
else LABEL_STYLE_TABLENAME_PLUS_COL
|
|
)
|
|
a_eq(
|
|
users.select()
|
|
.order_by(users.c.user_name.nulls_first())
|
|
.set_label_style(label_style),
|
|
[(1, None), (3, "a"), (2, "b")],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(users.c.user_name.nulls_last())
|
|
.set_label_style(label_style),
|
|
[(3, "a"), (2, "b"), (1, None)],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(asc(users.c.user_name).nulls_first())
|
|
.set_label_style(label_style),
|
|
[(1, None), (3, "a"), (2, "b")],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(asc(users.c.user_name).nulls_last())
|
|
.set_label_style(label_style),
|
|
[(3, "a"), (2, "b"), (1, None)],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(users.c.user_name.desc().nulls_first())
|
|
.set_label_style(label_style),
|
|
[(1, None), (2, "b"), (3, "a")],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(users.c.user_name.desc().nulls_last())
|
|
.set_label_style(label_style),
|
|
[(2, "b"), (3, "a"), (1, None)],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(desc(users.c.user_name).nulls_first())
|
|
.set_label_style(label_style),
|
|
[(1, None), (2, "b"), (3, "a")],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(desc(users.c.user_name).nulls_last())
|
|
.set_label_style(label_style),
|
|
[(2, "b"), (3, "a"), (1, None)],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(
|
|
users.c.user_name.nulls_first(),
|
|
users.c.user_id,
|
|
)
|
|
.set_label_style(label_style),
|
|
[(1, None), (3, "a"), (2, "b")],
|
|
)
|
|
|
|
a_eq(
|
|
users.select()
|
|
.order_by(users.c.user_name.nulls_last(), users.c.user_id)
|
|
.set_label_style(label_style),
|
|
[(3, "a"), (2, "b"), (1, None)],
|
|
)
|
|
|
|
def test_in_filtering(self, connection):
|
|
"""test the behavior of the in_() function."""
|
|
users = self.tables.users
|
|
|
|
connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
|
|
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
|
|
connection.execute(users.insert(), dict(user_id=9, user_name=None))
|
|
|
|
s = users.select().where(users.c.user_name.in_([]))
|
|
r = connection.execute(s).fetchall()
|
|
# No username is in empty set
|
|
assert len(r) == 0
|
|
|
|
s = users.select().where(not_(users.c.user_name.in_([])))
|
|
r = connection.execute(s).fetchall()
|
|
assert len(r) == 3
|
|
|
|
s = users.select().where(users.c.user_name.in_(["jack", "fred"]))
|
|
r = connection.execute(s).fetchall()
|
|
assert len(r) == 2
|
|
|
|
s = users.select().where(not_(users.c.user_name.in_(["jack", "fred"])))
|
|
r = connection.execute(s).fetchall()
|
|
# Null values are not outside any set
|
|
assert len(r) == 0
|
|
|
|
def test_expanding_in(self, connection):
|
|
users = self.tables.users
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
dict(user_id=7, user_name="jack"),
|
|
dict(user_id=8, user_name="fred"),
|
|
dict(user_id=9, user_name=None),
|
|
],
|
|
)
|
|
|
|
stmt = (
|
|
select(users)
|
|
.where(users.c.user_name.in_(bindparam("uname", expanding=True)))
|
|
.order_by(users.c.user_id)
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(stmt, {"uname": ["jack"]}).fetchall(),
|
|
[(7, "jack")],
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(stmt, {"uname": ["jack", "fred"]}).fetchall(),
|
|
[(7, "jack"), (8, "fred")],
|
|
)
|
|
|
|
eq_(connection.execute(stmt, {"uname": []}).fetchall(), [])
|
|
|
|
assert_raises_message(
|
|
exc.StatementError,
|
|
"'expanding' parameters can't be used with executemany()",
|
|
connection.execute,
|
|
users.update().where(
|
|
users.c.user_name.in_(bindparam("uname", expanding=True))
|
|
),
|
|
[{"uname": ["fred"]}, {"uname": ["ed"]}],
|
|
)
|
|
|
|
@testing.requires.no_quoting_special_bind_names
|
|
def test_expanding_in_special_chars(self, connection):
|
|
users = self.tables.users
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
dict(user_id=7, user_name="jack"),
|
|
dict(user_id=8, user_name="fred"),
|
|
],
|
|
)
|
|
|
|
stmt = (
|
|
select(users)
|
|
.where(users.c.user_name.in_(bindparam("u35", expanding=True)))
|
|
.where(users.c.user_id == bindparam("u46"))
|
|
.order_by(users.c.user_id)
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(
|
|
stmt, {"u35": ["jack", "fred"], "u46": 7}
|
|
).fetchall(),
|
|
[(7, "jack")],
|
|
)
|
|
|
|
stmt = (
|
|
select(users)
|
|
.where(users.c.user_name.in_(bindparam("u.35", expanding=True)))
|
|
.where(users.c.user_id == bindparam("u.46"))
|
|
.order_by(users.c.user_id)
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(
|
|
stmt, {"u.35": ["jack", "fred"], "u.46": 7}
|
|
).fetchall(),
|
|
[(7, "jack")],
|
|
)
|
|
|
|
def test_expanding_in_multiple(self, connection):
|
|
users = self.tables.users
|
|
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
dict(user_id=7, user_name="jack"),
|
|
dict(user_id=8, user_name="fred"),
|
|
dict(user_id=9, user_name="ed"),
|
|
],
|
|
)
|
|
|
|
stmt = (
|
|
select(users)
|
|
.where(users.c.user_name.in_(bindparam("uname", expanding=True)))
|
|
.where(users.c.user_id.in_(bindparam("userid", expanding=True)))
|
|
.order_by(users.c.user_id)
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(
|
|
stmt, {"uname": ["jack", "fred", "ed"], "userid": [8, 9]}
|
|
).fetchall(),
|
|
[(8, "fred"), (9, "ed")],
|
|
)
|
|
|
|
def test_expanding_in_repeated(self, connection):
|
|
users = self.tables.users
|
|
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
dict(user_id=7, user_name="jack"),
|
|
dict(user_id=8, user_name="fred"),
|
|
dict(user_id=9, user_name="ed"),
|
|
],
|
|
)
|
|
|
|
stmt = (
|
|
select(users)
|
|
.where(
|
|
users.c.user_name.in_(bindparam("uname", expanding=True))
|
|
| users.c.user_name.in_(bindparam("uname2", expanding=True))
|
|
)
|
|
.where(users.c.user_id == 8)
|
|
)
|
|
stmt = stmt.union(
|
|
select(users)
|
|
.where(
|
|
users.c.user_name.in_(bindparam("uname", expanding=True))
|
|
| users.c.user_name.in_(bindparam("uname2", expanding=True))
|
|
)
|
|
.where(users.c.user_id == 9)
|
|
).order_by("user_id")
|
|
|
|
eq_(
|
|
connection.execute(
|
|
stmt,
|
|
{
|
|
"uname": ["jack", "fred"],
|
|
"uname2": ["ed"],
|
|
"userid": [8, 9],
|
|
},
|
|
).fetchall(),
|
|
[(8, "fred"), (9, "ed")],
|
|
)
|
|
|
|
@testing.requires.tuple_in
|
|
def test_expanding_in_composite(self, connection):
|
|
users = self.tables.users
|
|
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
dict(user_id=7, user_name="jack"),
|
|
dict(user_id=8, user_name="fred"),
|
|
dict(user_id=9, user_name=None),
|
|
],
|
|
)
|
|
|
|
stmt = (
|
|
select(users)
|
|
.where(
|
|
tuple_(users.c.user_id, users.c.user_name).in_(
|
|
bindparam("uname", expanding=True)
|
|
)
|
|
)
|
|
.order_by(users.c.user_id)
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(stmt, {"uname": [(7, "jack")]}).fetchall(),
|
|
[(7, "jack")],
|
|
)
|
|
|
|
eq_(
|
|
connection.execute(
|
|
stmt, {"uname": [(7, "jack"), (8, "fred")]}
|
|
).fetchall(),
|
|
[(7, "jack"), (8, "fred")],
|
|
)
|
|
|
|
@testing.skip_if(["mssql"])
|
|
def test_bind_in(self, connection):
|
|
"""test calling IN against a bind parameter.
|
|
|
|
this isn't allowed on several platforms since we
|
|
generate ? = ?.
|
|
|
|
"""
|
|
|
|
users = self.tables.users
|
|
|
|
connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
|
|
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
|
|
connection.execute(users.insert(), dict(user_id=9, user_name=None))
|
|
|
|
u = bindparam("search_key", type_=String)
|
|
|
|
s = users.select().where(not_(u.in_([])))
|
|
r = connection.execute(s, dict(search_key="john")).fetchall()
|
|
assert len(r) == 3
|
|
r = connection.execute(s, dict(search_key=None)).fetchall()
|
|
assert len(r) == 3
|
|
|
|
def test_literal_in(self, connection):
|
|
"""similar to test_bind_in but use a bind with a value."""
|
|
|
|
users = self.tables.users
|
|
|
|
connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
|
|
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
|
|
connection.execute(users.insert(), dict(user_id=9, user_name=None))
|
|
|
|
s = users.select().where(not_(literal("john").in_([])))
|
|
r = connection.execute(s).fetchall()
|
|
assert len(r) == 3
|
|
|
|
@testing.requires.boolean_col_expressions
|
|
def test_empty_in_filtering_static(self, connection):
|
|
"""test the behavior of the in_() function when
|
|
comparing against an empty collection, specifically
|
|
that a proper boolean value is generated.
|
|
|
|
"""
|
|
users = self.tables.users
|
|
|
|
connection.execute(
|
|
users.insert(),
|
|
[
|
|
{"user_id": 7, "user_name": "jack"},
|
|
{"user_id": 8, "user_name": "ed"},
|
|
{"user_id": 9, "user_name": None},
|
|
],
|
|
)
|
|
|
|
s = users.select().where(users.c.user_name.in_([]) == True) # noqa
|
|
r = connection.execute(s).fetchall()
|
|
assert len(r) == 0
|
|
s = users.select().where(users.c.user_name.in_([]) == False) # noqa
|
|
r = connection.execute(s).fetchall()
|
|
assert len(r) == 3
|
|
s = users.select().where(users.c.user_name.in_([]) == None) # noqa
|
|
r = connection.execute(s).fetchall()
|
|
assert len(r) == 0
|
|
|
|
|
|
class RequiredBindTest(fixtures.TablesTest):
|
|
run_create_tables = None
|
|
run_deletes = None
|
|
|
|
@classmethod
|
|
def define_tables(cls, metadata):
|
|
Table(
|
|
"foo",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("data", String(50)),
|
|
Column("x", Integer),
|
|
)
|
|
|
|
def _assert_raises(self, stmt, params):
|
|
with testing.db.connect() as conn:
|
|
assert_raises_message(
|
|
exc.StatementError,
|
|
"A value is required for bind parameter 'x'",
|
|
conn.execute,
|
|
stmt,
|
|
params,
|
|
)
|
|
|
|
def test_insert(self):
|
|
stmt = self.tables.foo.insert().values(
|
|
x=bindparam("x"), data=bindparam("data")
|
|
)
|
|
self._assert_raises(stmt, {"data": "data"})
|
|
|
|
def test_select_where(self):
|
|
stmt = (
|
|
select(self.tables.foo)
|
|
.where(self.tables.foo.c.data == bindparam("data"))
|
|
.where(self.tables.foo.c.x == bindparam("x"))
|
|
)
|
|
self._assert_raises(stmt, {"data": "data"})
|
|
|
|
@testing.requires.standalone_binds
|
|
def test_select_columns(self):
|
|
stmt = select(bindparam("data"), bindparam("x"))
|
|
self._assert_raises(stmt, {"data": "data"})
|
|
|
|
def test_text(self):
|
|
stmt = text("select * from foo where x=:x and data=:data1")
|
|
self._assert_raises(stmt, {"data1": "data"})
|
|
|
|
def test_required_flag(self):
|
|
is_(bindparam("foo").required, True)
|
|
is_(bindparam("foo", required=False).required, False)
|
|
is_(bindparam("foo", "bar").required, False)
|
|
is_(bindparam("foo", "bar", required=True).required, True)
|
|
|
|
def c():
|
|
return None
|
|
|
|
is_(bindparam("foo", callable_=c, required=True).required, True)
|
|
is_(bindparam("foo", callable_=c).required, False)
|
|
is_(bindparam("foo", callable_=c, required=False).required, False)
|
|
|
|
|
|
class LimitTest(fixtures.TablesTest):
|
|
__sparse_driver_backend__ = True
|
|
|
|
@classmethod
|
|
def define_tables(cls, metadata):
|
|
Table(
|
|
"users",
|
|
metadata,
|
|
Column("user_id", INT, primary_key=True),
|
|
Column("user_name", VARCHAR(20)),
|
|
)
|
|
Table(
|
|
"addresses",
|
|
metadata,
|
|
Column("address_id", Integer, primary_key=True),
|
|
Column("user_id", Integer, ForeignKey("users.user_id")),
|
|
Column("address", String(30)),
|
|
)
|
|
|
|
@classmethod
|
|
def insert_data(cls, connection):
|
|
users, addresses = cls.tables("users", "addresses")
|
|
conn = connection
|
|
conn.execute(users.insert(), dict(user_id=1, user_name="john"))
|
|
conn.execute(
|
|
addresses.insert(), dict(address_id=1, user_id=1, address="addr1")
|
|
)
|
|
conn.execute(users.insert(), dict(user_id=2, user_name="jack"))
|
|
conn.execute(
|
|
addresses.insert(), dict(address_id=2, user_id=2, address="addr1")
|
|
)
|
|
conn.execute(users.insert(), dict(user_id=3, user_name="ed"))
|
|
conn.execute(
|
|
addresses.insert(), dict(address_id=3, user_id=3, address="addr2")
|
|
)
|
|
conn.execute(users.insert(), dict(user_id=4, user_name="wendy"))
|
|
conn.execute(
|
|
addresses.insert(), dict(address_id=4, user_id=4, address="addr3")
|
|
)
|
|
conn.execute(users.insert(), dict(user_id=5, user_name="laura"))
|
|
conn.execute(
|
|
addresses.insert(), dict(address_id=5, user_id=5, address="addr4")
|
|
)
|
|
conn.execute(users.insert(), dict(user_id=6, user_name="ralph"))
|
|
conn.execute(
|
|
addresses.insert(), dict(address_id=6, user_id=6, address="addr5")
|
|
)
|
|
conn.execute(users.insert(), dict(user_id=7, user_name="fido"))
|
|
conn.execute(
|
|
addresses.insert(), dict(address_id=7, user_id=7, address="addr5")
|
|
)
|
|
|
|
def test_select_limit(self, connection):
|
|
users, addresses = self.tables("users", "addresses")
|
|
r = connection.execute(
|
|
users.select().limit(3).order_by(users.c.user_id)
|
|
).fetchall()
|
|
self.assert_(r == [(1, "john"), (2, "jack"), (3, "ed")], repr(r))
|
|
|
|
@testing.requires.offset
|
|
def test_select_limit_offset(self, connection):
|
|
"""Test the interaction between limit and offset"""
|
|
|
|
users, addresses = self.tables("users", "addresses")
|
|
|
|
r = connection.execute(
|
|
users.select().limit(3).offset(2).order_by(users.c.user_id)
|
|
).fetchall()
|
|
self.assert_(r == [(3, "ed"), (4, "wendy"), (5, "laura")])
|
|
r = connection.execute(
|
|
users.select().offset(5).order_by(users.c.user_id)
|
|
).fetchall()
|
|
self.assert_(r == [(6, "ralph"), (7, "fido")])
|
|
|
|
def test_select_distinct_limit(self, connection):
|
|
"""Test the interaction between limit and distinct"""
|
|
|
|
users, addresses = self.tables("users", "addresses")
|
|
|
|
r = sorted(
|
|
[
|
|
x[0]
|
|
for x in connection.execute(
|
|
select(addresses.c.address).distinct().limit(3)
|
|
)
|
|
]
|
|
)
|
|
self.assert_(len(r) == 3, repr(r))
|
|
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
|
|
|
|
@testing.requires.offset
|
|
def test_select_distinct_offset(self, connection):
|
|
"""Test the interaction between distinct and offset"""
|
|
|
|
users, addresses = self.tables("users", "addresses")
|
|
|
|
r = sorted(
|
|
[
|
|
x[0]
|
|
for x in connection.execute(
|
|
select(addresses.c.address)
|
|
.distinct()
|
|
.offset(1)
|
|
.order_by(addresses.c.address)
|
|
).fetchall()
|
|
]
|
|
)
|
|
eq_(len(r), 4)
|
|
self.assert_(r[0] != r[1] and r[1] != r[2] and r[2] != [3], repr(r))
|
|
|
|
@testing.requires.offset
|
|
def test_select_distinct_limit_offset(self, connection):
|
|
"""Test the interaction between limit and limit/offset"""
|
|
|
|
users, addresses = self.tables("users", "addresses")
|
|
|
|
r = connection.execute(
|
|
select(addresses.c.address)
|
|
.order_by(addresses.c.address)
|
|
.distinct()
|
|
.offset(2)
|
|
.limit(3)
|
|
).fetchall()
|
|
self.assert_(len(r) == 3, repr(r))
|
|
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
|
|
|
|
|
|
class CompoundTest(fixtures.TablesTest):
|
|
"""test compound statements like UNION, INTERSECT, particularly their
|
|
ability to nest on different databases."""
|
|
|
|
__sparse_driver_backend__ = True
|
|
|
|
run_inserts = "each"
|
|
|
|
@classmethod
|
|
def define_tables(cls, metadata):
|
|
Table(
|
|
"t1",
|
|
metadata,
|
|
Column(
|
|
"col1",
|
|
Integer,
|
|
test_needs_autoincrement=True,
|
|
primary_key=True,
|
|
),
|
|
Column("col2", String(30)),
|
|
Column("col3", String(40)),
|
|
Column("col4", String(30)),
|
|
)
|
|
Table(
|
|
"t2",
|
|
metadata,
|
|
Column(
|
|
"col1",
|
|
Integer,
|
|
test_needs_autoincrement=True,
|
|
primary_key=True,
|
|
),
|
|
Column("col2", String(30)),
|
|
Column("col3", String(40)),
|
|
Column("col4", String(30)),
|
|
)
|
|
Table(
|
|
"t3",
|
|
metadata,
|
|
Column(
|
|
"col1",
|
|
Integer,
|
|
test_needs_autoincrement=True,
|
|
primary_key=True,
|
|
),
|
|
Column("col2", String(30)),
|
|
Column("col3", String(40)),
|
|
Column("col4", String(30)),
|
|
)
|
|
|
|
@classmethod
|
|
def insert_data(cls, connection):
|
|
t1, t2, t3 = cls.tables("t1", "t2", "t3")
|
|
conn = connection
|
|
conn.execute(
|
|
t1.insert(),
|
|
[
|
|
dict(col2="t1col2r1", col3="aaa", col4="aaa"),
|
|
dict(col2="t1col2r2", col3="bbb", col4="bbb"),
|
|
dict(col2="t1col2r3", col3="ccc", col4="ccc"),
|
|
],
|
|
)
|
|
conn.execute(
|
|
t2.insert(),
|
|
[
|
|
dict(col2="t2col2r1", col3="aaa", col4="bbb"),
|
|
dict(col2="t2col2r2", col3="bbb", col4="ccc"),
|
|
dict(col2="t2col2r3", col3="ccc", col4="aaa"),
|
|
],
|
|
)
|
|
conn.execute(
|
|
t3.insert(),
|
|
[
|
|
dict(col2="t3col2r1", col3="aaa", col4="ccc"),
|
|
dict(col2="t3col2r2", col3="bbb", col4="aaa"),
|
|
dict(col2="t3col2r3", col3="ccc", col4="bbb"),
|
|
],
|
|
)
|
|
|
|
def _fetchall_sorted(self, executed):
|
|
return sorted([tuple(row) for row in executed.fetchall()])
|
|
|
|
@testing.requires.subqueries
|
|
def test_union(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
(s1, s2) = (
|
|
select(t1.c.col3.label("col3"), t1.c.col4.label("col4")).where(
|
|
t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
|
|
),
|
|
select(t2.c.col3.label("col3"), t2.c.col4.label("col4")).where(
|
|
t2.c.col2.in_(["t2col2r2", "t2col2r3"]),
|
|
),
|
|
)
|
|
u = union(s1, s2)
|
|
|
|
wanted = [
|
|
("aaa", "aaa"),
|
|
("bbb", "bbb"),
|
|
("bbb", "ccc"),
|
|
("ccc", "aaa"),
|
|
]
|
|
found1 = self._fetchall_sorted(connection.execute(u))
|
|
eq_(found1, wanted)
|
|
|
|
found2 = self._fetchall_sorted(
|
|
connection.execute(u.alias("bar").select())
|
|
)
|
|
eq_(found2, wanted)
|
|
|
|
def test_union_ordered(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
(s1, s2) = (
|
|
select(t1.c.col3.label("col3"), t1.c.col4.label("col4")).where(
|
|
t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
|
|
),
|
|
select(t2.c.col3.label("col3"), t2.c.col4.label("col4")).where(
|
|
t2.c.col2.in_(["t2col2r2", "t2col2r3"]),
|
|
),
|
|
)
|
|
u = union(s1, s2).order_by("col3", "col4")
|
|
|
|
wanted = [
|
|
("aaa", "aaa"),
|
|
("bbb", "bbb"),
|
|
("bbb", "ccc"),
|
|
("ccc", "aaa"),
|
|
]
|
|
eq_(connection.execute(u).fetchall(), wanted)
|
|
|
|
@testing.requires.subqueries
|
|
def test_union_ordered_alias(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
(s1, s2) = (
|
|
select(t1.c.col3.label("col3"), t1.c.col4.label("col4")).where(
|
|
t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
|
|
),
|
|
select(t2.c.col3.label("col3"), t2.c.col4.label("col4")).where(
|
|
t2.c.col2.in_(["t2col2r2", "t2col2r3"]),
|
|
),
|
|
)
|
|
u = union(s1, s2).order_by("col3", "col4")
|
|
|
|
wanted = [
|
|
("aaa", "aaa"),
|
|
("bbb", "bbb"),
|
|
("bbb", "ccc"),
|
|
("ccc", "aaa"),
|
|
]
|
|
eq_(connection.execute(u.alias("bar").select()).fetchall(), wanted)
|
|
|
|
@testing.crashes("oracle", "FIXME: unknown, verify not fails_on")
|
|
@testing.fails_on(
|
|
testing.requires._mysql_not_mariadb_104_not_mysql8031, "FIXME: unknown"
|
|
)
|
|
@testing.fails_on("sqlite", "FIXME: unknown")
|
|
def test_union_all(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
e = union_all(
|
|
select(t1.c.col3),
|
|
union(select(t1.c.col3), select(t1.c.col3)),
|
|
)
|
|
|
|
wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)]
|
|
found1 = self._fetchall_sorted(connection.execute(e))
|
|
eq_(found1, wanted)
|
|
|
|
found2 = self._fetchall_sorted(
|
|
connection.execute(e.alias("foo").select())
|
|
)
|
|
eq_(found2, wanted)
|
|
|
|
def test_union_all_lightweight(self, connection):
|
|
"""like test_union_all, but breaks the sub-union into
|
|
a subquery with an explicit column reference on the outside,
|
|
more palatable to a wider variety of engines.
|
|
|
|
"""
|
|
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
u = union(select(t1.c.col3), select(t1.c.col3)).alias()
|
|
|
|
e = union_all(select(t1.c.col3), select(u.c.col3))
|
|
|
|
wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)]
|
|
found1 = self._fetchall_sorted(connection.execute(e))
|
|
eq_(found1, wanted)
|
|
|
|
found2 = self._fetchall_sorted(
|
|
connection.execute(e.alias("foo").select())
|
|
)
|
|
eq_(found2, wanted)
|
|
|
|
@testing.requires.intersect
|
|
def test_intersect(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
i = intersect(
|
|
select(t2.c.col3, t2.c.col4),
|
|
select(t2.c.col3, t2.c.col4).where(t2.c.col4 == t3.c.col3),
|
|
)
|
|
|
|
wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
|
|
|
|
found1 = self._fetchall_sorted(connection.execute(i))
|
|
eq_(found1, wanted)
|
|
|
|
found2 = self._fetchall_sorted(
|
|
connection.execute(i.alias("bar").select())
|
|
)
|
|
eq_(found2, wanted)
|
|
|
|
@testing.requires.except_
|
|
@testing.fails_on("sqlite", "Can't handle this style of nesting")
|
|
def test_except_style1(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
e = except_(
|
|
union(
|
|
select(t1.c.col3, t1.c.col4),
|
|
select(t2.c.col3, t2.c.col4),
|
|
select(t3.c.col3, t3.c.col4),
|
|
),
|
|
select(t2.c.col3, t2.c.col4),
|
|
)
|
|
|
|
wanted = [
|
|
("aaa", "aaa"),
|
|
("aaa", "ccc"),
|
|
("bbb", "aaa"),
|
|
("bbb", "bbb"),
|
|
("ccc", "bbb"),
|
|
("ccc", "ccc"),
|
|
]
|
|
|
|
found = self._fetchall_sorted(connection.execute(e.alias().select()))
|
|
eq_(found, wanted)
|
|
|
|
@testing.requires.except_
|
|
def test_except_style2(self, connection):
|
|
# same as style1, but add alias().select() to the except_().
|
|
# sqlite can handle it now.
|
|
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
e = except_(
|
|
union(
|
|
select(t1.c.col3, t1.c.col4),
|
|
select(t2.c.col3, t2.c.col4),
|
|
select(t3.c.col3, t3.c.col4),
|
|
)
|
|
.alias()
|
|
.select(),
|
|
select(t2.c.col3, t2.c.col4),
|
|
)
|
|
|
|
wanted = [
|
|
("aaa", "aaa"),
|
|
("aaa", "ccc"),
|
|
("bbb", "aaa"),
|
|
("bbb", "bbb"),
|
|
("ccc", "bbb"),
|
|
("ccc", "ccc"),
|
|
]
|
|
|
|
found1 = self._fetchall_sorted(connection.execute(e))
|
|
eq_(found1, wanted)
|
|
|
|
found2 = self._fetchall_sorted(connection.execute(e.alias().select()))
|
|
eq_(found2, wanted)
|
|
|
|
@testing.fails_on(
|
|
["sqlite", testing.requires._mysql_not_mariadb_104_not_mysql8031],
|
|
"Can't handle this style of nesting",
|
|
)
|
|
@testing.requires.except_
|
|
def test_except_style3(self, connection):
|
|
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
e = except_(
|
|
select(t1.c.col3), # aaa, bbb, ccc
|
|
except_(
|
|
select(t2.c.col3), # aaa, bbb, ccc
|
|
select(t3.c.col3).where(t3.c.col3 == "ccc"), # ccc
|
|
),
|
|
)
|
|
eq_(connection.execute(e).fetchall(), [("ccc",)])
|
|
eq_(connection.execute(e.alias("foo").select()).fetchall(), [("ccc",)])
|
|
|
|
@testing.requires.except_
|
|
def test_except_style4(self, connection):
|
|
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
e = except_(
|
|
select(t1.c.col3), # aaa, bbb, ccc
|
|
except_(
|
|
select(t2.c.col3), # aaa, bbb, ccc
|
|
select(t3.c.col3).where(t3.c.col3 == "ccc"), # ccc
|
|
)
|
|
.alias()
|
|
.select(),
|
|
)
|
|
|
|
eq_(connection.execute(e).fetchall(), [("ccc",)])
|
|
eq_(connection.execute(e.alias().select()).fetchall(), [("ccc",)])
|
|
|
|
@testing.requires.intersect
|
|
@testing.fails_on(
|
|
["sqlite", testing.requires._mysql_not_mariadb_104_not_mysql8031],
|
|
"sqlite can't handle leading parenthesis",
|
|
)
|
|
def test_intersect_unions(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
u = intersect(
|
|
union(select(t1.c.col3, t1.c.col4), select(t3.c.col3, t3.c.col4)),
|
|
union(select(t2.c.col3, t2.c.col4), select(t3.c.col3, t3.c.col4))
|
|
.alias()
|
|
.select(),
|
|
)
|
|
wanted = [("aaa", "ccc"), ("bbb", "aaa"), ("ccc", "bbb")]
|
|
found = self._fetchall_sorted(connection.execute(u))
|
|
|
|
eq_(found, wanted)
|
|
|
|
@testing.requires.intersect
|
|
def test_intersect_unions_2(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
u = intersect(
|
|
union(select(t1.c.col3, t1.c.col4), select(t3.c.col3, t3.c.col4))
|
|
.alias()
|
|
.select(),
|
|
union(select(t2.c.col3, t2.c.col4), select(t3.c.col3, t3.c.col4))
|
|
.alias()
|
|
.select(),
|
|
)
|
|
wanted = [("aaa", "ccc"), ("bbb", "aaa"), ("ccc", "bbb")]
|
|
found = self._fetchall_sorted(connection.execute(u))
|
|
|
|
eq_(found, wanted)
|
|
|
|
@testing.requires.intersect
|
|
def test_intersect_unions_3(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
u = intersect(
|
|
select(t2.c.col3, t2.c.col4),
|
|
union(
|
|
select(t1.c.col3, t1.c.col4),
|
|
select(t2.c.col3, t2.c.col4),
|
|
select(t3.c.col3, t3.c.col4),
|
|
)
|
|
.alias()
|
|
.select(),
|
|
)
|
|
wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
|
|
found = self._fetchall_sorted(connection.execute(u))
|
|
|
|
eq_(found, wanted)
|
|
|
|
@testing.requires.intersect
|
|
def test_composite_alias(self, connection):
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
ua = intersect(
|
|
select(t2.c.col3, t2.c.col4),
|
|
union(
|
|
select(t1.c.col3, t1.c.col4),
|
|
select(t2.c.col3, t2.c.col4),
|
|
select(t3.c.col3, t3.c.col4),
|
|
)
|
|
.alias()
|
|
.select(),
|
|
).alias()
|
|
|
|
wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
|
|
found = self._fetchall_sorted(connection.execute(ua.select()))
|
|
eq_(found, wanted)
|
|
|
|
|
|
class JoinTest(fixtures.TablesTest):
|
|
"""Tests join execution.
|
|
|
|
The compiled SQL emitted by the dialect might be ANSI joins or
|
|
theta joins ('old oracle style', with (+) for OUTER). This test
|
|
tries to exercise join syntax and uncover any inconsistencies in
|
|
`JOIN rhs ON lhs.col=rhs.col` vs `rhs.col=lhs.col`. At least one
|
|
database seems to be sensitive to this.
|
|
"""
|
|
|
|
__sparse_driver_backend__ = True
|
|
|
|
@classmethod
|
|
def define_tables(cls, metadata):
|
|
Table(
|
|
"t1",
|
|
metadata,
|
|
Column("t1_id", Integer, primary_key=True),
|
|
Column("name", String(32)),
|
|
)
|
|
Table(
|
|
"t2",
|
|
metadata,
|
|
Column("t2_id", Integer, primary_key=True),
|
|
Column("t1_id", Integer, ForeignKey("t1.t1_id")),
|
|
Column("name", String(32)),
|
|
)
|
|
Table(
|
|
"t3",
|
|
metadata,
|
|
Column("t3_id", Integer, primary_key=True),
|
|
Column("t2_id", Integer, ForeignKey("t2.t2_id")),
|
|
Column("name", String(32)),
|
|
)
|
|
|
|
@classmethod
|
|
def insert_data(cls, connection):
|
|
conn = connection
|
|
# t1.10 -> t2.20 -> t3.30
|
|
# t1.11 -> t2.21
|
|
# t1.12
|
|
t1, t2, t3 = cls.tables("t1", "t2", "t3")
|
|
|
|
conn.execute(
|
|
t1.insert(),
|
|
[
|
|
{"t1_id": 10, "name": "t1 #10"},
|
|
{"t1_id": 11, "name": "t1 #11"},
|
|
{"t1_id": 12, "name": "t1 #12"},
|
|
],
|
|
)
|
|
conn.execute(
|
|
t2.insert(),
|
|
[
|
|
{"t2_id": 20, "t1_id": 10, "name": "t2 #20"},
|
|
{"t2_id": 21, "t1_id": 11, "name": "t2 #21"},
|
|
],
|
|
)
|
|
conn.execute(
|
|
t3.insert(), [{"t3_id": 30, "t2_id": 20, "name": "t3 #30"}]
|
|
)
|
|
|
|
def assertRows(self, statement, expected):
|
|
"""Execute a statement and assert that rows returned equal expected."""
|
|
with testing.db.connect() as conn:
|
|
found = sorted(
|
|
[tuple(row) for row in conn.execute(statement).fetchall()]
|
|
)
|
|
eq_(found, sorted(expected))
|
|
|
|
def test_join_x1(self):
|
|
"""Joins t1->t2."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id):
|
|
expr = select(t1.c.t1_id, t2.c.t2_id).select_from(
|
|
t1.join(t2, criteria)
|
|
)
|
|
self.assertRows(expr, [(10, 20), (11, 21)])
|
|
|
|
def test_join_x2(self):
|
|
"""Joins t1->t2->t3."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id):
|
|
expr = select(t1.c.t1_id, t2.c.t2_id).select_from(
|
|
t1.join(t2, criteria)
|
|
)
|
|
self.assertRows(expr, [(10, 20), (11, 21)])
|
|
|
|
def test_outerjoin_x1(self):
|
|
"""Outer joins t1->t2."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = select(t1.c.t1_id, t2.c.t2_id).select_from(
|
|
t1.join(t2).join(t3, criteria)
|
|
)
|
|
self.assertRows(expr, [(10, 20)])
|
|
|
|
def test_outerjoin_x2(self):
|
|
"""Outer joins t1->t2,t3."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id).select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
self.assertRows(
|
|
expr, [(10, 20, 30), (11, 21, None), (12, None, None)]
|
|
)
|
|
|
|
def test_outerjoin_where_x2_t1(self):
|
|
"""Outer joins t1->t2,t3, where on t1."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(t1.c.name == "t1 #10")
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(t1.c.t1_id < 12)
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
|
|
|
|
def test_outerjoin_where_x2_t2(self):
|
|
"""Outer joins t1->t2,t3, where on t2."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(t2.c.name == "t2 #20")
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(t2.c.t2_id < 29)
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
|
|
|
|
def test_outerjoin_where_x2_t3(self):
|
|
"""Outer joins t1->t2,t3, where on t3."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(t3.c.name == "t3 #30")
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(t3.c.t3_id < 39)
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
def test_outerjoin_where_x2_t1t3(self):
|
|
"""Outer joins t1->t2,t3, where on t1 and t3."""
|
|
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(and_(t1.c.name == "t1 #10", t3.c.name == "t3 #30"))
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(and_(t1.c.t1_id < 19, t3.c.t3_id < 39))
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
def test_outerjoin_where_x2_t1t2(self):
|
|
"""Outer joins t1->t2,t3, where on t1 and t2."""
|
|
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(and_(t1.c.name == "t1 #10", t2.c.name == "t2 #20"))
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(and_(t1.c.t1_id < 12, t2.c.t2_id < 39))
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
|
|
|
|
def test_outerjoin_where_x2_t1t2t3(self):
|
|
"""Outer joins t1->t2,t3, where on t1, t2 and t3."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(
|
|
and_(
|
|
t1.c.name == "t1 #10",
|
|
t2.c.name == "t2 #20",
|
|
t3.c.name == "t3 #30",
|
|
)
|
|
)
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(and_(t1.c.t1_id < 19, t2.c.t2_id < 29, t3.c.t3_id < 39))
|
|
.select_from(
|
|
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
|
|
t3, criteria
|
|
)
|
|
)
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
def test_mixed(self):
|
|
"""Joins t1->t2, outer t2->t3."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id).select_from(
|
|
(t1.join(t2).outerjoin(t3, criteria)),
|
|
)
|
|
print(expr)
|
|
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
|
|
|
|
def test_mixed_where(self):
|
|
"""Joins t1->t2, outer t2->t3, plus a where on each table in turn."""
|
|
t1, t2, t3 = self.tables("t1", "t2", "t3")
|
|
|
|
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(
|
|
t1.c.name == "t1 #10",
|
|
)
|
|
.select_from(t1.join(t2).outerjoin(t3, criteria))
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(
|
|
t2.c.name == "t2 #20",
|
|
)
|
|
.select_from(t1.join(t2).outerjoin(t3, criteria))
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(
|
|
t3.c.name == "t3 #30",
|
|
)
|
|
.select_from(t1.join(t2).outerjoin(t3, criteria))
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(
|
|
and_(t1.c.name == "t1 #10", t2.c.name == "t2 #20"),
|
|
)
|
|
.select_from(t1.join(t2).outerjoin(t3, criteria))
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(
|
|
and_(t2.c.name == "t2 #20", t3.c.name == "t3 #30"),
|
|
)
|
|
.select_from(t1.join(t2).outerjoin(t3, criteria))
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
expr = (
|
|
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
|
|
.where(
|
|
and_(
|
|
t1.c.name == "t1 #10",
|
|
t2.c.name == "t2 #20",
|
|
t3.c.name == "t3 #30",
|
|
),
|
|
)
|
|
.select_from(t1.join(t2).outerjoin(t3, criteria))
|
|
)
|
|
self.assertRows(expr, [(10, 20, 30)])
|
|
|
|
|
|
class OperatorTest(fixtures.TablesTest):
|
|
__sparse_driver_backend__ = True
|
|
|
|
@classmethod
|
|
def define_tables(cls, metadata):
|
|
Table(
|
|
"flds",
|
|
metadata,
|
|
Column(
|
|
"idcol",
|
|
Integer,
|
|
primary_key=True,
|
|
test_needs_autoincrement=True,
|
|
),
|
|
Column("intcol", Integer),
|
|
Column("strcol", String(50)),
|
|
)
|
|
|
|
@classmethod
|
|
def insert_data(cls, connection):
|
|
flds = cls.tables.flds
|
|
connection.execute(
|
|
flds.insert(),
|
|
[dict(intcol=5, strcol="foo"), dict(intcol=13, strcol="bar")],
|
|
)
|
|
|
|
# TODO: seems like more tests warranted for this setup.
|
|
def test_modulo(self, connection):
|
|
flds = self.tables.flds
|
|
|
|
eq_(
|
|
connection.execute(
|
|
select(flds.c.intcol % 3).order_by(flds.c.idcol)
|
|
).fetchall(),
|
|
[(2,), (1,)],
|
|
)
|
|
|
|
@testing.requires.window_functions
|
|
def test_over(self, connection):
|
|
flds = self.tables.flds
|
|
|
|
eq_(
|
|
connection.execute(
|
|
select(
|
|
flds.c.intcol,
|
|
func.row_number().over(order_by=flds.c.strcol),
|
|
)
|
|
).fetchall(),
|
|
[(13, 1), (5, 2)],
|
|
)
|