Files
Mike Bayer cf62005639 test oracle 23c, mariadb12; reduce backend use
one particular vector test wont run on oracle 23c free, so
just disable it.

added better skips for the rest of the vector tests and
fixed a deprecation issue.

this will be the first run on the new oracle23 on CI so we'll have to
see how this goes.

Also adjust for mariabdb12 being overly helpful with regards
to stale row updates.

as we are having trouble getting 23c to pass throug transaction
tests, i noted we have an explosion of tests due to the multiple
drivers, so this patch introduces __sparse_driver_backend__
for all tests where we want variety of
database server but there's no need to test every driver.
This should dramatically reduce the size of the test suite run

Change-Id: Ic8d3eb0a60e76b4c54c6bb4a721f90c81ede782b
2025-11-25 14:50:56 -05:00

1806 lines
57 KiB
Python

from sqlalchemy import and_
from sqlalchemy import asc
from sqlalchemy import bindparam
from sqlalchemy import cast
from sqlalchemy import desc
from sqlalchemy import exc
from sqlalchemy import except_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import intersect
from sqlalchemy import literal
from sqlalchemy import literal_column
from sqlalchemy import not_
from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import tuple_
from sqlalchemy import TypeDecorator
from sqlalchemy import union
from sqlalchemy import union_all
from sqlalchemy import VARCHAR
from sqlalchemy.engine import default
from sqlalchemy.sql import LABEL_STYLE_TABLENAME_PLUS_COL
from sqlalchemy.sql.selectable import LABEL_STYLE_NONE
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.util import resolve_lambda
class QueryTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"users",
metadata,
Column(
"user_id", INT, primary_key=True, test_needs_autoincrement=True
),
Column("user_name", VARCHAR(20)),
test_needs_acid=True,
)
Table(
"addresses",
metadata,
Column(
"address_id",
Integer,
primary_key=True,
test_needs_autoincrement=True,
),
Column("user_id", Integer, ForeignKey("users.user_id")),
Column("address", String(30)),
test_needs_acid=True,
)
Table(
"u2",
metadata,
Column("user_id", INT, primary_key=True),
Column("user_name", VARCHAR(20)),
test_needs_acid=True,
)
def test_order_by_label(self, connection):
"""test that a label within an ORDER BY works on each backend.
This test should be modified to support [ticket:1068] when that ticket
is implemented. For now, you need to put the actual string in the
ORDER BY.
"""
users = self.tables.users
connection.execute(
users.insert(),
[
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
],
)
concat = ("test: " + users.c.user_name).label("thedata")
eq_(
connection.execute(select(concat).order_by("thedata")).fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)],
)
eq_(
connection.execute(select(concat).order_by("thedata")).fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)],
)
concat = ("test: " + users.c.user_name).label("thedata")
eq_(
connection.execute(
select(concat).order_by(desc("thedata"))
).fetchall(),
[("test: jack",), ("test: fred",), ("test: ed",)],
)
@testing.requires.order_by_label_with_expression
def test_order_by_label_compound(self, connection):
users = self.tables.users
connection.execute(
users.insert(),
[
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
],
)
concat = ("test: " + users.c.user_name).label("thedata")
eq_(
connection.execute(
select(concat).order_by(literal_column("thedata") + "x")
).fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)],
)
@testing.requires.boolean_col_expressions
def test_or_and_as_columns(self, connection):
true, false = literal(True), literal(False)
eq_(connection.execute(select(and_(true, false))).scalar(), False)
eq_(connection.execute(select(and_(true, true))).scalar(), True)
eq_(connection.execute(select(or_(true, false))).scalar(), True)
eq_(connection.execute(select(or_(false, false))).scalar(), False)
eq_(
connection.execute(select(not_(or_(false, false)))).scalar(),
True,
)
row = connection.execute(
select(or_(false, false).label("x"), and_(true, false).label("y"))
).first()
assert row.x == False # noqa
assert row.y == False # noqa
row = connection.execute(
select(or_(true, false).label("x"), and_(true, false).label("y"))
).first()
assert row.x == True # noqa
assert row.y == False # noqa
def test_select_tuple(self, connection):
users = self.tables.users
connection.execute(
users.insert(),
{"user_id": 1, "user_name": "apples"},
)
assert_raises_message(
exc.CompileError,
r"Most backends don't support SELECTing from a tuple\(\) object.",
connection.execute,
select(tuple_(users.c.user_id, users.c.user_name)),
)
@testing.combinations(
(
lambda users: select(users.c.user_id).where(
users.c.user_name.startswith("apple")
),
[(1,)],
),
(
lambda users: select(users.c.user_id).where(
users.c.user_name.contains("i % t")
),
[(5,)],
),
(
lambda users: select(users.c.user_id).where(
users.c.user_name.endswith("anas")
),
[(3,)],
),
(
lambda users: select(users.c.user_id).where(
users.c.user_name.contains("i % t", escape="&")
),
[(5,)],
),
argnames="expr,result",
)
def test_like_ops(self, connection, expr, result):
users = self.tables.users
connection.execute(
users.insert(),
[
{"user_id": 1, "user_name": "apples"},
{"user_id": 2, "user_name": "oranges"},
{"user_id": 3, "user_name": "bananas"},
{"user_id": 4, "user_name": "legumes"},
{"user_id": 5, "user_name": "hi % there"},
],
)
expr = resolve_lambda(expr, users=users)
eq_(connection.execute(expr).fetchall(), result)
@testing.requires.mod_operator_as_percent_sign
@testing.emits_warning(".*now automatically escapes.*")
def test_percents_in_text(self, connection):
for expr, result in (
(text("select 6 % 10"), 6),
(text("select 17 % 10"), 7),
(text("select '%'"), "%"),
(text("select '%%'"), "%%"),
(text("select '%%%'"), "%%%"),
(text("select 'hello % world'"), "hello % world"),
):
eq_(connection.scalar(expr), result)
def test_ilike(self, connection):
users = self.tables.users
connection.execute(
users.insert(),
[
{"user_id": 1, "user_name": "one"},
{"user_id": 2, "user_name": "TwO"},
{"user_id": 3, "user_name": "ONE"},
{"user_id": 4, "user_name": "OnE"},
],
)
eq_(
connection.execute(
select(users.c.user_id).where(users.c.user_name.ilike("one"))
).fetchall(),
[(1,), (3,), (4,)],
)
eq_(
connection.execute(
select(users.c.user_id).where(users.c.user_name.ilike("TWO"))
).fetchall(),
[(2,)],
)
if testing.against("postgresql"):
eq_(
connection.execute(
select(users.c.user_id).where(
users.c.user_name.like("one")
)
).fetchall(),
[(1,)],
)
eq_(
connection.execute(
select(users.c.user_id).where(
users.c.user_name.like("TWO")
)
).fetchall(),
[],
)
def test_repeated_bindparams(self, connection):
"""Tests that a BindParam can be used more than once.
This should be run for DB-APIs with both positional and named
paramstyles.
"""
users = self.tables.users
connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
u = bindparam("userid")
s = users.select().where(
and_(users.c.user_name == u, users.c.user_name == u)
)
r = connection.execute(s, dict(userid="fred")).fetchall()
assert len(r) == 1
def test_bindparam_detection(self):
dialect = default.DefaultDialect(paramstyle="qmark")
def prep(q):
return str(sql.text(q).compile(dialect=dialect))
def a_eq(got, wanted):
if got != wanted:
print("Wanted %s" % wanted)
print("Received %s" % got)
self.assert_(got == wanted, got)
a_eq(prep("select foo"), "select foo")
a_eq(prep("time='12:30:00'"), "time='12:30:00'")
a_eq(prep("time='12:30:00'"), "time='12:30:00'")
a_eq(prep(":this:that"), ":this:that")
a_eq(prep(":this :that"), "? ?")
a_eq(prep("(:this),(:that :other)"), "(?),(? ?)")
a_eq(prep("(:this),(:that:other)"), "(?),(:that:other)")
a_eq(prep("(:this),(:that,:other)"), "(?),(?,?)")
a_eq(prep("(:that_:other)"), "(:that_:other)")
a_eq(prep("(:that_ :other)"), "(? ?)")
a_eq(prep("(:that_other)"), "(?)")
a_eq(prep("(:that$other)"), "(?)")
a_eq(prep("(:that$:other)"), "(:that$:other)")
a_eq(prep(".:that$ :other."), ".? ?.")
a_eq(prep(r"select \foo"), r"select \foo")
a_eq(prep(r"time='12\:30:00'"), r"time='12\:30:00'")
a_eq(prep(r":this \:that"), "? :that")
a_eq(prep(r"(\:that$other)"), "(:that$other)")
a_eq(prep(r".\:that$ :other."), ".:that$ ?.")
@testing.requires.standalone_binds
def test_select_from_bindparam(self, connection):
"""Test result row processing when selecting from a plain bind
param."""
class MyInteger(TypeDecorator):
impl = Integer
cache_ok = True
def process_bind_param(self, value, dialect):
return int(value[4:])
def process_result_value(self, value, dialect):
return "INT_%d" % value
eq_(
connection.scalar(select(cast("INT_5", type_=MyInteger))),
"INT_5",
)
eq_(
connection.scalar(
select(cast("INT_5", type_=MyInteger).label("foo"))
),
"INT_5",
)
def test_order_by(self, connection):
"""Exercises ORDER BY clause generation.
Tests simple, compound, aliased and DESC clauses.
"""
users = self.tables.users
connection.execute(users.insert(), dict(user_id=1, user_name="c"))
connection.execute(users.insert(), dict(user_id=2, user_name="b"))
connection.execute(users.insert(), dict(user_id=3, user_name="a"))
def a_eq(executable, wanted):
got = list(connection.execute(executable))
eq_(got, wanted)
for labels in False, True:
label_style = (
LABEL_STYLE_NONE
if labels is False
else LABEL_STYLE_TABLENAME_PLUS_COL
)
def go(stmt):
if labels:
stmt = stmt.set_label_style(label_style)
return stmt
a_eq(
users.select()
.order_by(users.c.user_id)
.set_label_style(label_style),
[(1, "c"), (2, "b"), (3, "a")],
)
a_eq(
users.select()
.order_by(users.c.user_name, users.c.user_id)
.set_label_style(label_style),
[(3, "a"), (2, "b"), (1, "c")],
)
a_eq(
go(
select(users.c.user_id.label("foo")).order_by(
users.c.user_id
)
),
[(1,), (2,), (3,)],
)
a_eq(
go(
select(
users.c.user_id.label("foo"), users.c.user_name
).order_by(users.c.user_name, users.c.user_id),
),
[(3, "a"), (2, "b"), (1, "c")],
)
a_eq(
users.select()
.distinct()
.order_by(users.c.user_id)
.set_label_style(label_style),
[(1, "c"), (2, "b"), (3, "a")],
)
a_eq(
go(
select(users.c.user_id.label("foo"))
.distinct()
.order_by(users.c.user_id),
),
[(1,), (2,), (3,)],
)
a_eq(
go(
select(
users.c.user_id.label("a"),
users.c.user_id.label("b"),
users.c.user_name,
).order_by(users.c.user_id),
),
[(1, 1, "c"), (2, 2, "b"), (3, 3, "a")],
)
a_eq(
users.select()
.distinct()
.order_by(desc(users.c.user_id))
.set_label_style(label_style),
[(3, "a"), (2, "b"), (1, "c")],
)
a_eq(
go(
select(users.c.user_id.label("foo"))
.distinct()
.order_by(users.c.user_id.desc()),
),
[(3,), (2,), (1,)],
)
@testing.requires.nullsordering
def test_order_by_nulls(self, connection):
"""Exercises ORDER BY clause generation.
Tests simple, compound, aliased and DESC clauses.
"""
users = self.tables.users
connection.execute(users.insert(), dict(user_id=1))
connection.execute(users.insert(), dict(user_id=2, user_name="b"))
connection.execute(users.insert(), dict(user_id=3, user_name="a"))
def a_eq(executable, wanted):
got = list(connection.execute(executable))
eq_(got, wanted)
for labels in False, True:
label_style = (
LABEL_STYLE_NONE
if labels is False
else LABEL_STYLE_TABLENAME_PLUS_COL
)
a_eq(
users.select()
.order_by(users.c.user_name.nulls_first())
.set_label_style(label_style),
[(1, None), (3, "a"), (2, "b")],
)
a_eq(
users.select()
.order_by(users.c.user_name.nulls_last())
.set_label_style(label_style),
[(3, "a"), (2, "b"), (1, None)],
)
a_eq(
users.select()
.order_by(asc(users.c.user_name).nulls_first())
.set_label_style(label_style),
[(1, None), (3, "a"), (2, "b")],
)
a_eq(
users.select()
.order_by(asc(users.c.user_name).nulls_last())
.set_label_style(label_style),
[(3, "a"), (2, "b"), (1, None)],
)
a_eq(
users.select()
.order_by(users.c.user_name.desc().nulls_first())
.set_label_style(label_style),
[(1, None), (2, "b"), (3, "a")],
)
a_eq(
users.select()
.order_by(users.c.user_name.desc().nulls_last())
.set_label_style(label_style),
[(2, "b"), (3, "a"), (1, None)],
)
a_eq(
users.select()
.order_by(desc(users.c.user_name).nulls_first())
.set_label_style(label_style),
[(1, None), (2, "b"), (3, "a")],
)
a_eq(
users.select()
.order_by(desc(users.c.user_name).nulls_last())
.set_label_style(label_style),
[(2, "b"), (3, "a"), (1, None)],
)
a_eq(
users.select()
.order_by(
users.c.user_name.nulls_first(),
users.c.user_id,
)
.set_label_style(label_style),
[(1, None), (3, "a"), (2, "b")],
)
a_eq(
users.select()
.order_by(users.c.user_name.nulls_last(), users.c.user_id)
.set_label_style(label_style),
[(3, "a"), (2, "b"), (1, None)],
)
def test_in_filtering(self, connection):
"""test the behavior of the in_() function."""
users = self.tables.users
connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
connection.execute(users.insert(), dict(user_id=9, user_name=None))
s = users.select().where(users.c.user_name.in_([]))
r = connection.execute(s).fetchall()
# No username is in empty set
assert len(r) == 0
s = users.select().where(not_(users.c.user_name.in_([])))
r = connection.execute(s).fetchall()
assert len(r) == 3
s = users.select().where(users.c.user_name.in_(["jack", "fred"]))
r = connection.execute(s).fetchall()
assert len(r) == 2
s = users.select().where(not_(users.c.user_name.in_(["jack", "fred"])))
r = connection.execute(s).fetchall()
# Null values are not outside any set
assert len(r) == 0
def test_expanding_in(self, connection):
users = self.tables.users
connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
dict(user_id=8, user_name="fred"),
dict(user_id=9, user_name=None),
],
)
stmt = (
select(users)
.where(users.c.user_name.in_(bindparam("uname", expanding=True)))
.order_by(users.c.user_id)
)
eq_(
connection.execute(stmt, {"uname": ["jack"]}).fetchall(),
[(7, "jack")],
)
eq_(
connection.execute(stmt, {"uname": ["jack", "fred"]}).fetchall(),
[(7, "jack"), (8, "fred")],
)
eq_(connection.execute(stmt, {"uname": []}).fetchall(), [])
assert_raises_message(
exc.StatementError,
"'expanding' parameters can't be used with executemany()",
connection.execute,
users.update().where(
users.c.user_name.in_(bindparam("uname", expanding=True))
),
[{"uname": ["fred"]}, {"uname": ["ed"]}],
)
@testing.requires.no_quoting_special_bind_names
def test_expanding_in_special_chars(self, connection):
users = self.tables.users
connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
dict(user_id=8, user_name="fred"),
],
)
stmt = (
select(users)
.where(users.c.user_name.in_(bindparam("u35", expanding=True)))
.where(users.c.user_id == bindparam("u46"))
.order_by(users.c.user_id)
)
eq_(
connection.execute(
stmt, {"u35": ["jack", "fred"], "u46": 7}
).fetchall(),
[(7, "jack")],
)
stmt = (
select(users)
.where(users.c.user_name.in_(bindparam("u.35", expanding=True)))
.where(users.c.user_id == bindparam("u.46"))
.order_by(users.c.user_id)
)
eq_(
connection.execute(
stmt, {"u.35": ["jack", "fred"], "u.46": 7}
).fetchall(),
[(7, "jack")],
)
def test_expanding_in_multiple(self, connection):
users = self.tables.users
connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
dict(user_id=8, user_name="fred"),
dict(user_id=9, user_name="ed"),
],
)
stmt = (
select(users)
.where(users.c.user_name.in_(bindparam("uname", expanding=True)))
.where(users.c.user_id.in_(bindparam("userid", expanding=True)))
.order_by(users.c.user_id)
)
eq_(
connection.execute(
stmt, {"uname": ["jack", "fred", "ed"], "userid": [8, 9]}
).fetchall(),
[(8, "fred"), (9, "ed")],
)
def test_expanding_in_repeated(self, connection):
users = self.tables.users
connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
dict(user_id=8, user_name="fred"),
dict(user_id=9, user_name="ed"),
],
)
stmt = (
select(users)
.where(
users.c.user_name.in_(bindparam("uname", expanding=True))
| users.c.user_name.in_(bindparam("uname2", expanding=True))
)
.where(users.c.user_id == 8)
)
stmt = stmt.union(
select(users)
.where(
users.c.user_name.in_(bindparam("uname", expanding=True))
| users.c.user_name.in_(bindparam("uname2", expanding=True))
)
.where(users.c.user_id == 9)
).order_by("user_id")
eq_(
connection.execute(
stmt,
{
"uname": ["jack", "fred"],
"uname2": ["ed"],
"userid": [8, 9],
},
).fetchall(),
[(8, "fred"), (9, "ed")],
)
@testing.requires.tuple_in
def test_expanding_in_composite(self, connection):
users = self.tables.users
connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
dict(user_id=8, user_name="fred"),
dict(user_id=9, user_name=None),
],
)
stmt = (
select(users)
.where(
tuple_(users.c.user_id, users.c.user_name).in_(
bindparam("uname", expanding=True)
)
)
.order_by(users.c.user_id)
)
eq_(
connection.execute(stmt, {"uname": [(7, "jack")]}).fetchall(),
[(7, "jack")],
)
eq_(
connection.execute(
stmt, {"uname": [(7, "jack"), (8, "fred")]}
).fetchall(),
[(7, "jack"), (8, "fred")],
)
@testing.skip_if(["mssql"])
def test_bind_in(self, connection):
"""test calling IN against a bind parameter.
this isn't allowed on several platforms since we
generate ? = ?.
"""
users = self.tables.users
connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
connection.execute(users.insert(), dict(user_id=9, user_name=None))
u = bindparam("search_key", type_=String)
s = users.select().where(not_(u.in_([])))
r = connection.execute(s, dict(search_key="john")).fetchall()
assert len(r) == 3
r = connection.execute(s, dict(search_key=None)).fetchall()
assert len(r) == 3
def test_literal_in(self, connection):
"""similar to test_bind_in but use a bind with a value."""
users = self.tables.users
connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
connection.execute(users.insert(), dict(user_id=9, user_name=None))
s = users.select().where(not_(literal("john").in_([])))
r = connection.execute(s).fetchall()
assert len(r) == 3
@testing.requires.boolean_col_expressions
def test_empty_in_filtering_static(self, connection):
"""test the behavior of the in_() function when
comparing against an empty collection, specifically
that a proper boolean value is generated.
"""
users = self.tables.users
connection.execute(
users.insert(),
[
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": None},
],
)
s = users.select().where(users.c.user_name.in_([]) == True) # noqa
r = connection.execute(s).fetchall()
assert len(r) == 0
s = users.select().where(users.c.user_name.in_([]) == False) # noqa
r = connection.execute(s).fetchall()
assert len(r) == 3
s = users.select().where(users.c.user_name.in_([]) == None) # noqa
r = connection.execute(s).fetchall()
assert len(r) == 0
class RequiredBindTest(fixtures.TablesTest):
run_create_tables = None
run_deletes = None
@classmethod
def define_tables(cls, metadata):
Table(
"foo",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
Column("x", Integer),
)
def _assert_raises(self, stmt, params):
with testing.db.connect() as conn:
assert_raises_message(
exc.StatementError,
"A value is required for bind parameter 'x'",
conn.execute,
stmt,
params,
)
def test_insert(self):
stmt = self.tables.foo.insert().values(
x=bindparam("x"), data=bindparam("data")
)
self._assert_raises(stmt, {"data": "data"})
def test_select_where(self):
stmt = (
select(self.tables.foo)
.where(self.tables.foo.c.data == bindparam("data"))
.where(self.tables.foo.c.x == bindparam("x"))
)
self._assert_raises(stmt, {"data": "data"})
@testing.requires.standalone_binds
def test_select_columns(self):
stmt = select(bindparam("data"), bindparam("x"))
self._assert_raises(stmt, {"data": "data"})
def test_text(self):
stmt = text("select * from foo where x=:x and data=:data1")
self._assert_raises(stmt, {"data1": "data"})
def test_required_flag(self):
is_(bindparam("foo").required, True)
is_(bindparam("foo", required=False).required, False)
is_(bindparam("foo", "bar").required, False)
is_(bindparam("foo", "bar", required=True).required, True)
def c():
return None
is_(bindparam("foo", callable_=c, required=True).required, True)
is_(bindparam("foo", callable_=c).required, False)
is_(bindparam("foo", callable_=c, required=False).required, False)
class LimitTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"users",
metadata,
Column("user_id", INT, primary_key=True),
Column("user_name", VARCHAR(20)),
)
Table(
"addresses",
metadata,
Column("address_id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey("users.user_id")),
Column("address", String(30)),
)
@classmethod
def insert_data(cls, connection):
users, addresses = cls.tables("users", "addresses")
conn = connection
conn.execute(users.insert(), dict(user_id=1, user_name="john"))
conn.execute(
addresses.insert(), dict(address_id=1, user_id=1, address="addr1")
)
conn.execute(users.insert(), dict(user_id=2, user_name="jack"))
conn.execute(
addresses.insert(), dict(address_id=2, user_id=2, address="addr1")
)
conn.execute(users.insert(), dict(user_id=3, user_name="ed"))
conn.execute(
addresses.insert(), dict(address_id=3, user_id=3, address="addr2")
)
conn.execute(users.insert(), dict(user_id=4, user_name="wendy"))
conn.execute(
addresses.insert(), dict(address_id=4, user_id=4, address="addr3")
)
conn.execute(users.insert(), dict(user_id=5, user_name="laura"))
conn.execute(
addresses.insert(), dict(address_id=5, user_id=5, address="addr4")
)
conn.execute(users.insert(), dict(user_id=6, user_name="ralph"))
conn.execute(
addresses.insert(), dict(address_id=6, user_id=6, address="addr5")
)
conn.execute(users.insert(), dict(user_id=7, user_name="fido"))
conn.execute(
addresses.insert(), dict(address_id=7, user_id=7, address="addr5")
)
def test_select_limit(self, connection):
users, addresses = self.tables("users", "addresses")
r = connection.execute(
users.select().limit(3).order_by(users.c.user_id)
).fetchall()
self.assert_(r == [(1, "john"), (2, "jack"), (3, "ed")], repr(r))
@testing.requires.offset
def test_select_limit_offset(self, connection):
"""Test the interaction between limit and offset"""
users, addresses = self.tables("users", "addresses")
r = connection.execute(
users.select().limit(3).offset(2).order_by(users.c.user_id)
).fetchall()
self.assert_(r == [(3, "ed"), (4, "wendy"), (5, "laura")])
r = connection.execute(
users.select().offset(5).order_by(users.c.user_id)
).fetchall()
self.assert_(r == [(6, "ralph"), (7, "fido")])
def test_select_distinct_limit(self, connection):
"""Test the interaction between limit and distinct"""
users, addresses = self.tables("users", "addresses")
r = sorted(
[
x[0]
for x in connection.execute(
select(addresses.c.address).distinct().limit(3)
)
]
)
self.assert_(len(r) == 3, repr(r))
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
@testing.requires.offset
def test_select_distinct_offset(self, connection):
"""Test the interaction between distinct and offset"""
users, addresses = self.tables("users", "addresses")
r = sorted(
[
x[0]
for x in connection.execute(
select(addresses.c.address)
.distinct()
.offset(1)
.order_by(addresses.c.address)
).fetchall()
]
)
eq_(len(r), 4)
self.assert_(r[0] != r[1] and r[1] != r[2] and r[2] != [3], repr(r))
@testing.requires.offset
def test_select_distinct_limit_offset(self, connection):
"""Test the interaction between limit and limit/offset"""
users, addresses = self.tables("users", "addresses")
r = connection.execute(
select(addresses.c.address)
.order_by(addresses.c.address)
.distinct()
.offset(2)
.limit(3)
).fetchall()
self.assert_(len(r) == 3, repr(r))
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
class CompoundTest(fixtures.TablesTest):
"""test compound statements like UNION, INTERSECT, particularly their
ability to nest on different databases."""
__sparse_driver_backend__ = True
run_inserts = "each"
@classmethod
def define_tables(cls, metadata):
Table(
"t1",
metadata,
Column(
"col1",
Integer,
test_needs_autoincrement=True,
primary_key=True,
),
Column("col2", String(30)),
Column("col3", String(40)),
Column("col4", String(30)),
)
Table(
"t2",
metadata,
Column(
"col1",
Integer,
test_needs_autoincrement=True,
primary_key=True,
),
Column("col2", String(30)),
Column("col3", String(40)),
Column("col4", String(30)),
)
Table(
"t3",
metadata,
Column(
"col1",
Integer,
test_needs_autoincrement=True,
primary_key=True,
),
Column("col2", String(30)),
Column("col3", String(40)),
Column("col4", String(30)),
)
@classmethod
def insert_data(cls, connection):
t1, t2, t3 = cls.tables("t1", "t2", "t3")
conn = connection
conn.execute(
t1.insert(),
[
dict(col2="t1col2r1", col3="aaa", col4="aaa"),
dict(col2="t1col2r2", col3="bbb", col4="bbb"),
dict(col2="t1col2r3", col3="ccc", col4="ccc"),
],
)
conn.execute(
t2.insert(),
[
dict(col2="t2col2r1", col3="aaa", col4="bbb"),
dict(col2="t2col2r2", col3="bbb", col4="ccc"),
dict(col2="t2col2r3", col3="ccc", col4="aaa"),
],
)
conn.execute(
t3.insert(),
[
dict(col2="t3col2r1", col3="aaa", col4="ccc"),
dict(col2="t3col2r2", col3="bbb", col4="aaa"),
dict(col2="t3col2r3", col3="ccc", col4="bbb"),
],
)
def _fetchall_sorted(self, executed):
return sorted([tuple(row) for row in executed.fetchall()])
@testing.requires.subqueries
def test_union(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
(s1, s2) = (
select(t1.c.col3.label("col3"), t1.c.col4.label("col4")).where(
t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
),
select(t2.c.col3.label("col3"), t2.c.col4.label("col4")).where(
t2.c.col2.in_(["t2col2r2", "t2col2r3"]),
),
)
u = union(s1, s2)
wanted = [
("aaa", "aaa"),
("bbb", "bbb"),
("bbb", "ccc"),
("ccc", "aaa"),
]
found1 = self._fetchall_sorted(connection.execute(u))
eq_(found1, wanted)
found2 = self._fetchall_sorted(
connection.execute(u.alias("bar").select())
)
eq_(found2, wanted)
def test_union_ordered(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
(s1, s2) = (
select(t1.c.col3.label("col3"), t1.c.col4.label("col4")).where(
t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
),
select(t2.c.col3.label("col3"), t2.c.col4.label("col4")).where(
t2.c.col2.in_(["t2col2r2", "t2col2r3"]),
),
)
u = union(s1, s2).order_by("col3", "col4")
wanted = [
("aaa", "aaa"),
("bbb", "bbb"),
("bbb", "ccc"),
("ccc", "aaa"),
]
eq_(connection.execute(u).fetchall(), wanted)
@testing.requires.subqueries
def test_union_ordered_alias(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
(s1, s2) = (
select(t1.c.col3.label("col3"), t1.c.col4.label("col4")).where(
t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
),
select(t2.c.col3.label("col3"), t2.c.col4.label("col4")).where(
t2.c.col2.in_(["t2col2r2", "t2col2r3"]),
),
)
u = union(s1, s2).order_by("col3", "col4")
wanted = [
("aaa", "aaa"),
("bbb", "bbb"),
("bbb", "ccc"),
("ccc", "aaa"),
]
eq_(connection.execute(u.alias("bar").select()).fetchall(), wanted)
@testing.crashes("oracle", "FIXME: unknown, verify not fails_on")
@testing.fails_on(
testing.requires._mysql_not_mariadb_104_not_mysql8031, "FIXME: unknown"
)
@testing.fails_on("sqlite", "FIXME: unknown")
def test_union_all(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
e = union_all(
select(t1.c.col3),
union(select(t1.c.col3), select(t1.c.col3)),
)
wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)]
found1 = self._fetchall_sorted(connection.execute(e))
eq_(found1, wanted)
found2 = self._fetchall_sorted(
connection.execute(e.alias("foo").select())
)
eq_(found2, wanted)
def test_union_all_lightweight(self, connection):
"""like test_union_all, but breaks the sub-union into
a subquery with an explicit column reference on the outside,
more palatable to a wider variety of engines.
"""
t1, t2, t3 = self.tables("t1", "t2", "t3")
u = union(select(t1.c.col3), select(t1.c.col3)).alias()
e = union_all(select(t1.c.col3), select(u.c.col3))
wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)]
found1 = self._fetchall_sorted(connection.execute(e))
eq_(found1, wanted)
found2 = self._fetchall_sorted(
connection.execute(e.alias("foo").select())
)
eq_(found2, wanted)
@testing.requires.intersect
def test_intersect(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
i = intersect(
select(t2.c.col3, t2.c.col4),
select(t2.c.col3, t2.c.col4).where(t2.c.col4 == t3.c.col3),
)
wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
found1 = self._fetchall_sorted(connection.execute(i))
eq_(found1, wanted)
found2 = self._fetchall_sorted(
connection.execute(i.alias("bar").select())
)
eq_(found2, wanted)
@testing.requires.except_
@testing.fails_on("sqlite", "Can't handle this style of nesting")
def test_except_style1(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
e = except_(
union(
select(t1.c.col3, t1.c.col4),
select(t2.c.col3, t2.c.col4),
select(t3.c.col3, t3.c.col4),
),
select(t2.c.col3, t2.c.col4),
)
wanted = [
("aaa", "aaa"),
("aaa", "ccc"),
("bbb", "aaa"),
("bbb", "bbb"),
("ccc", "bbb"),
("ccc", "ccc"),
]
found = self._fetchall_sorted(connection.execute(e.alias().select()))
eq_(found, wanted)
@testing.requires.except_
def test_except_style2(self, connection):
# same as style1, but add alias().select() to the except_().
# sqlite can handle it now.
t1, t2, t3 = self.tables("t1", "t2", "t3")
e = except_(
union(
select(t1.c.col3, t1.c.col4),
select(t2.c.col3, t2.c.col4),
select(t3.c.col3, t3.c.col4),
)
.alias()
.select(),
select(t2.c.col3, t2.c.col4),
)
wanted = [
("aaa", "aaa"),
("aaa", "ccc"),
("bbb", "aaa"),
("bbb", "bbb"),
("ccc", "bbb"),
("ccc", "ccc"),
]
found1 = self._fetchall_sorted(connection.execute(e))
eq_(found1, wanted)
found2 = self._fetchall_sorted(connection.execute(e.alias().select()))
eq_(found2, wanted)
@testing.fails_on(
["sqlite", testing.requires._mysql_not_mariadb_104_not_mysql8031],
"Can't handle this style of nesting",
)
@testing.requires.except_
def test_except_style3(self, connection):
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
t1, t2, t3 = self.tables("t1", "t2", "t3")
e = except_(
select(t1.c.col3), # aaa, bbb, ccc
except_(
select(t2.c.col3), # aaa, bbb, ccc
select(t3.c.col3).where(t3.c.col3 == "ccc"), # ccc
),
)
eq_(connection.execute(e).fetchall(), [("ccc",)])
eq_(connection.execute(e.alias("foo").select()).fetchall(), [("ccc",)])
@testing.requires.except_
def test_except_style4(self, connection):
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
t1, t2, t3 = self.tables("t1", "t2", "t3")
e = except_(
select(t1.c.col3), # aaa, bbb, ccc
except_(
select(t2.c.col3), # aaa, bbb, ccc
select(t3.c.col3).where(t3.c.col3 == "ccc"), # ccc
)
.alias()
.select(),
)
eq_(connection.execute(e).fetchall(), [("ccc",)])
eq_(connection.execute(e.alias().select()).fetchall(), [("ccc",)])
@testing.requires.intersect
@testing.fails_on(
["sqlite", testing.requires._mysql_not_mariadb_104_not_mysql8031],
"sqlite can't handle leading parenthesis",
)
def test_intersect_unions(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
u = intersect(
union(select(t1.c.col3, t1.c.col4), select(t3.c.col3, t3.c.col4)),
union(select(t2.c.col3, t2.c.col4), select(t3.c.col3, t3.c.col4))
.alias()
.select(),
)
wanted = [("aaa", "ccc"), ("bbb", "aaa"), ("ccc", "bbb")]
found = self._fetchall_sorted(connection.execute(u))
eq_(found, wanted)
@testing.requires.intersect
def test_intersect_unions_2(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
u = intersect(
union(select(t1.c.col3, t1.c.col4), select(t3.c.col3, t3.c.col4))
.alias()
.select(),
union(select(t2.c.col3, t2.c.col4), select(t3.c.col3, t3.c.col4))
.alias()
.select(),
)
wanted = [("aaa", "ccc"), ("bbb", "aaa"), ("ccc", "bbb")]
found = self._fetchall_sorted(connection.execute(u))
eq_(found, wanted)
@testing.requires.intersect
def test_intersect_unions_3(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
u = intersect(
select(t2.c.col3, t2.c.col4),
union(
select(t1.c.col3, t1.c.col4),
select(t2.c.col3, t2.c.col4),
select(t3.c.col3, t3.c.col4),
)
.alias()
.select(),
)
wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
found = self._fetchall_sorted(connection.execute(u))
eq_(found, wanted)
@testing.requires.intersect
def test_composite_alias(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
ua = intersect(
select(t2.c.col3, t2.c.col4),
union(
select(t1.c.col3, t1.c.col4),
select(t2.c.col3, t2.c.col4),
select(t3.c.col3, t3.c.col4),
)
.alias()
.select(),
).alias()
wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
found = self._fetchall_sorted(connection.execute(ua.select()))
eq_(found, wanted)
class JoinTest(fixtures.TablesTest):
"""Tests join execution.
The compiled SQL emitted by the dialect might be ANSI joins or
theta joins ('old oracle style', with (+) for OUTER). This test
tries to exercise join syntax and uncover any inconsistencies in
`JOIN rhs ON lhs.col=rhs.col` vs `rhs.col=lhs.col`. At least one
database seems to be sensitive to this.
"""
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"t1",
metadata,
Column("t1_id", Integer, primary_key=True),
Column("name", String(32)),
)
Table(
"t2",
metadata,
Column("t2_id", Integer, primary_key=True),
Column("t1_id", Integer, ForeignKey("t1.t1_id")),
Column("name", String(32)),
)
Table(
"t3",
metadata,
Column("t3_id", Integer, primary_key=True),
Column("t2_id", Integer, ForeignKey("t2.t2_id")),
Column("name", String(32)),
)
@classmethod
def insert_data(cls, connection):
conn = connection
# t1.10 -> t2.20 -> t3.30
# t1.11 -> t2.21
# t1.12
t1, t2, t3 = cls.tables("t1", "t2", "t3")
conn.execute(
t1.insert(),
[
{"t1_id": 10, "name": "t1 #10"},
{"t1_id": 11, "name": "t1 #11"},
{"t1_id": 12, "name": "t1 #12"},
],
)
conn.execute(
t2.insert(),
[
{"t2_id": 20, "t1_id": 10, "name": "t2 #20"},
{"t2_id": 21, "t1_id": 11, "name": "t2 #21"},
],
)
conn.execute(
t3.insert(), [{"t3_id": 30, "t2_id": 20, "name": "t3 #30"}]
)
def assertRows(self, statement, expected):
"""Execute a statement and assert that rows returned equal expected."""
with testing.db.connect() as conn:
found = sorted(
[tuple(row) for row in conn.execute(statement).fetchall()]
)
eq_(found, sorted(expected))
def test_join_x1(self):
"""Joins t1->t2."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id):
expr = select(t1.c.t1_id, t2.c.t2_id).select_from(
t1.join(t2, criteria)
)
self.assertRows(expr, [(10, 20), (11, 21)])
def test_join_x2(self):
"""Joins t1->t2->t3."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id):
expr = select(t1.c.t1_id, t2.c.t2_id).select_from(
t1.join(t2, criteria)
)
self.assertRows(expr, [(10, 20), (11, 21)])
def test_outerjoin_x1(self):
"""Outer joins t1->t2."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(t1.c.t1_id, t2.c.t2_id).select_from(
t1.join(t2).join(t3, criteria)
)
self.assertRows(expr, [(10, 20)])
def test_outerjoin_x2(self):
"""Outer joins t1->t2,t3."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id).select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
self.assertRows(
expr, [(10, 20, 30), (11, 21, None), (12, None, None)]
)
def test_outerjoin_where_x2_t1(self):
"""Outer joins t1->t2,t3, where on t1."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(t1.c.name == "t1 #10")
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(t1.c.t1_id < 12)
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
def test_outerjoin_where_x2_t2(self):
"""Outer joins t1->t2,t3, where on t2."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(t2.c.name == "t2 #20")
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(t2.c.t2_id < 29)
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
def test_outerjoin_where_x2_t3(self):
"""Outer joins t1->t2,t3, where on t3."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(t3.c.name == "t3 #30")
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(t3.c.t3_id < 39)
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30)])
def test_outerjoin_where_x2_t1t3(self):
"""Outer joins t1->t2,t3, where on t1 and t3."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(and_(t1.c.name == "t1 #10", t3.c.name == "t3 #30"))
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(and_(t1.c.t1_id < 19, t3.c.t3_id < 39))
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30)])
def test_outerjoin_where_x2_t1t2(self):
"""Outer joins t1->t2,t3, where on t1 and t2."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(and_(t1.c.name == "t1 #10", t2.c.name == "t2 #20"))
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(and_(t1.c.t1_id < 12, t2.c.t2_id < 39))
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
def test_outerjoin_where_x2_t1t2t3(self):
"""Outer joins t1->t2,t3, where on t1, t2 and t3."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(
and_(
t1.c.name == "t1 #10",
t2.c.name == "t2 #20",
t3.c.name == "t3 #30",
)
)
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(and_(t1.c.t1_id < 19, t2.c.t2_id < 29, t3.c.t3_id < 39))
.select_from(
t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).outerjoin(
t3, criteria
)
)
)
self.assertRows(expr, [(10, 20, 30)])
def test_mixed(self):
"""Joins t1->t2, outer t2->t3."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id).select_from(
(t1.join(t2).outerjoin(t3, criteria)),
)
print(expr)
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
def test_mixed_where(self):
"""Joins t1->t2, outer t2->t3, plus a where on each table in turn."""
t1, t2, t3 = self.tables("t1", "t2", "t3")
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(
t1.c.name == "t1 #10",
)
.select_from(t1.join(t2).outerjoin(t3, criteria))
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(
t2.c.name == "t2 #20",
)
.select_from(t1.join(t2).outerjoin(t3, criteria))
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(
t3.c.name == "t3 #30",
)
.select_from(t1.join(t2).outerjoin(t3, criteria))
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(
and_(t1.c.name == "t1 #10", t2.c.name == "t2 #20"),
)
.select_from(t1.join(t2).outerjoin(t3, criteria))
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(
and_(t2.c.name == "t2 #20", t3.c.name == "t3 #30"),
)
.select_from(t1.join(t2).outerjoin(t3, criteria))
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
.where(
and_(
t1.c.name == "t1 #10",
t2.c.name == "t2 #20",
t3.c.name == "t3 #30",
),
)
.select_from(t1.join(t2).outerjoin(t3, criteria))
)
self.assertRows(expr, [(10, 20, 30)])
class OperatorTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"flds",
metadata,
Column(
"idcol",
Integer,
primary_key=True,
test_needs_autoincrement=True,
),
Column("intcol", Integer),
Column("strcol", String(50)),
)
@classmethod
def insert_data(cls, connection):
flds = cls.tables.flds
connection.execute(
flds.insert(),
[dict(intcol=5, strcol="foo"), dict(intcol=13, strcol="bar")],
)
# TODO: seems like more tests warranted for this setup.
def test_modulo(self, connection):
flds = self.tables.flds
eq_(
connection.execute(
select(flds.c.intcol % 3).order_by(flds.c.idcol)
).fetchall(),
[(2,), (1,)],
)
@testing.requires.window_functions
def test_over(self, connection):
flds = self.tables.flds
eq_(
connection.execute(
select(
flds.c.intcol,
func.row_number().over(order_by=flds.c.strcol),
)
).fetchall(),
[(13, 1), (5, 2)],
)