Files
sqlalchemy/test/sql/test_case_statement.py
Mike Bayer 1e278de4cc Post black reformatting
Applied on top of a pure run of black -l 79 in
I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9,  this set of changes
resolves all remaining flake8 conditions for those codes
we have enabled in setup.cfg.

Included are resolutions for all remaining flake8 issues
including shadowed builtins, long lines, import order, unused
imports, duplicate imports, and docstring issues.

Change-Id: I4f72d3ba1380dd601610ff80b8fb06a2aff8b0fe
2019-01-06 18:23:11 -05:00

243 lines
7.0 KiB
Python

from sqlalchemy import and_
from sqlalchemy import case
from sqlalchemy import cast
from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import Integer
from sqlalchemy import literal_column
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy.sql import column
from sqlalchemy.sql import table
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
info_table = None
class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
@classmethod
def setup_class(cls):
metadata = MetaData(testing.db)
global info_table
info_table = Table(
"infos",
metadata,
Column("pk", Integer, primary_key=True),
Column("info", String(30)),
)
info_table.create()
info_table.insert().execute(
{"pk": 1, "info": "pk_1_data"},
{"pk": 2, "info": "pk_2_data"},
{"pk": 3, "info": "pk_3_data"},
{"pk": 4, "info": "pk_4_data"},
{"pk": 5, "info": "pk_5_data"},
{"pk": 6, "info": "pk_6_data"},
)
@classmethod
def teardown_class(cls):
info_table.drop()
@testing.fails_on("firebird", "FIXME: unknown")
@testing.requires.subqueries
def test_case(self):
inner = select(
[
case(
[
[info_table.c.pk < 3, "lessthan3"],
[
and_(info_table.c.pk >= 3, info_table.c.pk < 7),
"gt3",
],
]
).label("x"),
info_table.c.pk,
info_table.c.info,
],
from_obj=[info_table],
)
inner_result = inner.execute().fetchall()
# Outputs:
# lessthan3 1 pk_1_data
# lessthan3 2 pk_2_data
# gt3 3 pk_3_data
# gt3 4 pk_4_data
# gt3 5 pk_5_data
# gt3 6 pk_6_data
assert inner_result == [
("lessthan3", 1, "pk_1_data"),
("lessthan3", 2, "pk_2_data"),
("gt3", 3, "pk_3_data"),
("gt3", 4, "pk_4_data"),
("gt3", 5, "pk_5_data"),
("gt3", 6, "pk_6_data"),
]
outer = select([inner.alias("q_inner")])
outer_result = outer.execute().fetchall()
assert outer_result == [
("lessthan3", 1, "pk_1_data"),
("lessthan3", 2, "pk_2_data"),
("gt3", 3, "pk_3_data"),
("gt3", 4, "pk_4_data"),
("gt3", 5, "pk_5_data"),
("gt3", 6, "pk_6_data"),
]
w_else = select(
[
case(
[
[info_table.c.pk < 3, cast(3, Integer)],
[and_(info_table.c.pk >= 3, info_table.c.pk < 6), 6],
],
else_=0,
).label("x"),
info_table.c.pk,
info_table.c.info,
],
from_obj=[info_table],
)
else_result = w_else.execute().fetchall()
assert else_result == [
(3, 1, "pk_1_data"),
(3, 2, "pk_2_data"),
(6, 3, "pk_3_data"),
(6, 4, "pk_4_data"),
(6, 5, "pk_5_data"),
(0, 6, "pk_6_data"),
]
def test_literal_interpretation_ambiguous(self):
assert_raises_message(
exc.ArgumentError,
r"Ambiguous literal: 'x'. Use the 'text\(\)' function",
case,
[("x", "y")],
)
def test_literal_interpretation_ambiguous_tuple(self):
assert_raises_message(
exc.ArgumentError,
r"Ambiguous literal: \('x', 'y'\). Use the 'text\(\)' function",
case,
[(("x", "y"), "z")],
)
def test_literal_interpretation(self):
t = table("test", column("col1"))
self.assert_compile(
case([("x", "y")], value=t.c.col1),
"CASE test.col1 WHEN :param_1 THEN :param_2 END",
)
self.assert_compile(
case([(t.c.col1 == 7, "y")], else_="z"),
"CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END",
)
def test_text_doesnt_explode(self):
for s in [
select(
[
case(
[(info_table.c.info == "pk_4_data", text("'yes'"))],
else_=text("'no'"),
)
]
).order_by(info_table.c.info),
select(
[
case(
[
(
info_table.c.info == "pk_4_data",
literal_column("'yes'"),
)
],
else_=literal_column("'no'"),
)
]
).order_by(info_table.c.info),
]:
if testing.against("firebird"):
eq_(
s.execute().fetchall(),
[
("no ",),
("no ",),
("no ",),
("yes",),
("no ",),
("no ",),
],
)
else:
eq_(
s.execute().fetchall(),
[("no",), ("no",), ("no",), ("yes",), ("no",), ("no",)],
)
@testing.fails_on("firebird", "FIXME: unknown")
def testcase_with_dict(self):
query = select(
[
case(
{
info_table.c.pk < 3: "lessthan3",
info_table.c.pk >= 3: "gt3",
},
else_="other",
),
info_table.c.pk,
info_table.c.info,
],
from_obj=[info_table],
)
assert query.execute().fetchall() == [
("lessthan3", 1, "pk_1_data"),
("lessthan3", 2, "pk_2_data"),
("gt3", 3, "pk_3_data"),
("gt3", 4, "pk_4_data"),
("gt3", 5, "pk_5_data"),
("gt3", 6, "pk_6_data"),
]
simple_query = select(
[
case(
{1: "one", 2: "two"}, value=info_table.c.pk, else_="other"
),
info_table.c.pk,
],
whereclause=info_table.c.pk < 4,
from_obj=[info_table],
)
assert simple_query.execute().fetchall() == [
("one", 1),
("two", 2),
("other", 3),
]