mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-18 14:42:01 -04:00
91f376692d
Several weeks of using the future_select() construct has led to the proposal there be just one select() construct again which features the new join() method, and otherwise accepts both the 1.x and 2.x argument styles. This would make migration simpler and reduce confusion. However, confusion may be increased by the fact that select().join() is different Current thinking is we may be better off with a few hard behavioral changes to old and relatively unknown APIs rather than trying to play both sides within two extremely similar but subtly different APIs. At the moment, the .join() thing seems to be the only behavioral change that occurs without the user taking any explicit steps. Session.execute() will still behave the old way as we are adding a future flag. This change also adds the "future" flag to Session() and session.execute(), so that interpretation of the incoming statement, as well as that the new style result is returned, does not occur for existing applications unless they add the use of this flag. The change in general is moving the "removed in 2.0" system further along where we want the test suite to fully pass even if the SQLALCHEMY_WARN_20 flag is set. Get many tests to pass when SQLALCHEMY_WARN_20 is set; this should be ongoing after this patch merges. Improve the RemovedIn20 warning; these are all deprecated "since" 1.4, so ensure that's what the messages read. Make sure the inforamtion link is on all warnings. Add deprecation warnings for parameters present and add warnings to all FromClause.select() types of methods. Fixes: #5379 Fixes: #5284 Change-Id: I765a0b912b3dcd0e995426427d8bb7997cbffd51 References: #5159
462 lines
14 KiB
Python
462 lines
14 KiB
Python
from sqlalchemy import bindparam
|
|
from sqlalchemy import Column
|
|
from sqlalchemy import column
|
|
from sqlalchemy import exc
|
|
from sqlalchemy import func
|
|
from sqlalchemy import Integer
|
|
from sqlalchemy import MetaData
|
|
from sqlalchemy import select
|
|
from sqlalchemy import String
|
|
from sqlalchemy import Table
|
|
from sqlalchemy import table
|
|
from sqlalchemy import testing
|
|
from sqlalchemy import text
|
|
from sqlalchemy import update
|
|
from sqlalchemy.schema import DDL
|
|
from sqlalchemy.schema import Sequence
|
|
from sqlalchemy.sql import ClauseElement
|
|
from sqlalchemy.sql import coercions
|
|
from sqlalchemy.sql import false
|
|
from sqlalchemy.sql import False_
|
|
from sqlalchemy.sql import literal
|
|
from sqlalchemy.sql import roles
|
|
from sqlalchemy.sql import true
|
|
from sqlalchemy.sql import True_
|
|
from sqlalchemy.sql.coercions import expect
|
|
from sqlalchemy.sql.elements import _truncated_label
|
|
from sqlalchemy.sql.elements import Null
|
|
from sqlalchemy.sql.selectable import FromGrouping
|
|
from sqlalchemy.sql.selectable import SelectStatementGrouping
|
|
from sqlalchemy.testing import assert_raises
|
|
from sqlalchemy.testing import assert_raises_message
|
|
from sqlalchemy.testing import AssertsCompiledSQL
|
|
from sqlalchemy.testing import fixtures
|
|
from sqlalchemy.testing import is_
|
|
from sqlalchemy.testing import is_instance_of
|
|
from sqlalchemy.testing import is_true
|
|
|
|
m = MetaData()
|
|
|
|
t = Table("t", m, Column("q", Integer))
|
|
|
|
|
|
class NotAThing1(object):
|
|
pass
|
|
|
|
|
|
not_a_thing1 = NotAThing1()
|
|
|
|
|
|
class NotAThing2(ClauseElement):
|
|
pass
|
|
|
|
|
|
not_a_thing2 = NotAThing2()
|
|
|
|
|
|
class NotAThing3(object):
|
|
def __clause_element__(self):
|
|
return not_a_thing2
|
|
|
|
|
|
not_a_thing3 = NotAThing3()
|
|
|
|
|
|
class RoleTest(fixtures.TestBase):
|
|
# TODO: the individual role tests here are incomplete. The functionality
|
|
# of each role is covered by other tests in the sql testing suite however
|
|
# ideally they'd all have direct tests here as well.
|
|
|
|
def _test_role_neg_comparisons(self, role):
|
|
impl = coercions._impl_lookup[role]
|
|
role_name = impl.name
|
|
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
r"%s expected, got .*NotAThing1" % role_name,
|
|
expect,
|
|
role,
|
|
not_a_thing1,
|
|
)
|
|
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
r"%s expected, got .*NotAThing2" % role_name,
|
|
expect,
|
|
role,
|
|
not_a_thing2,
|
|
)
|
|
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
r"%s expected, got .*NotAThing3" % role_name,
|
|
expect,
|
|
role,
|
|
not_a_thing3,
|
|
)
|
|
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
r"%s expected for argument 'foo'; got .*NotAThing3" % role_name,
|
|
expect,
|
|
role,
|
|
not_a_thing3,
|
|
argname="foo",
|
|
)
|
|
|
|
def test_const_expr_role(self):
|
|
t = true()
|
|
is_(expect(roles.ConstExprRole, t), t)
|
|
|
|
f = false()
|
|
is_(expect(roles.ConstExprRole, f), f)
|
|
|
|
is_instance_of(expect(roles.ConstExprRole, True), True_)
|
|
|
|
is_instance_of(expect(roles.ConstExprRole, False), False_)
|
|
|
|
is_instance_of(expect(roles.ConstExprRole, None), Null)
|
|
|
|
def test_truncated_label_role(self):
|
|
is_instance_of(
|
|
expect(roles.TruncatedLabelRole, "foobar"), _truncated_label
|
|
)
|
|
|
|
def test_labeled_column_expr_role(self):
|
|
c = column("q")
|
|
is_true(expect(roles.LabeledColumnExprRole, c).compare(c))
|
|
|
|
is_true(
|
|
expect(roles.LabeledColumnExprRole, c.label("foo")).compare(
|
|
c.label("foo")
|
|
)
|
|
)
|
|
|
|
is_true(
|
|
expect(
|
|
roles.LabeledColumnExprRole,
|
|
select(column("q")).scalar_subquery(),
|
|
).compare(select(column("q")).label(None))
|
|
)
|
|
|
|
is_true(
|
|
expect(roles.LabeledColumnExprRole, not_a_thing1).compare(
|
|
literal(not_a_thing1).label(None)
|
|
)
|
|
)
|
|
|
|
def test_scalar_select_no_coercion(self):
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
expect(
|
|
roles.LabeledColumnExprRole, select(column("q")),
|
|
)
|
|
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
expect(
|
|
roles.LabeledColumnExprRole, select(column("q")).alias(),
|
|
)
|
|
|
|
def test_statement_no_text_coercion(self):
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
r"Textual SQL expression 'select \* from table' should be "
|
|
r"explicitly declared",
|
|
expect,
|
|
roles.StatementRole,
|
|
"select * from table",
|
|
)
|
|
|
|
def test_statement_text_coercion(self):
|
|
is_true(
|
|
expect(
|
|
roles.CoerceTextStatementRole, "select * from table"
|
|
).compare(text("select * from table"))
|
|
)
|
|
|
|
def test_select_statement_no_text_coercion(self):
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
r"Textual SQL expression 'select \* from table' should be "
|
|
r"explicitly declared",
|
|
expect,
|
|
roles.SelectStatementRole,
|
|
"select * from table",
|
|
)
|
|
|
|
def test_select_is_coerced_into_fromclause_w_deprecation(self):
|
|
with testing.expect_deprecated(
|
|
"Implicit coercion of SELECT and textual SELECT "
|
|
"constructs into FROM clauses is deprecated;"
|
|
):
|
|
element = expect(
|
|
roles.FromClauseRole, SelectStatementGrouping(select(t))
|
|
)
|
|
is_true(
|
|
element.compare(SelectStatementGrouping(select(t)).subquery())
|
|
)
|
|
|
|
def test_offset_or_limit_role_only_ints_or_clauseelement(self):
|
|
assert_raises(ValueError, select(t).limit, "some limit")
|
|
|
|
assert_raises(ValueError, select(t).offset, "some offset")
|
|
|
|
def test_offset_or_limit_role_clauseelement(self):
|
|
bind = bindparam("x")
|
|
stmt = select(t).limit(bind)
|
|
is_(stmt._limit_clause, bind)
|
|
|
|
stmt = select(t).offset(bind)
|
|
is_(stmt._offset_clause, bind)
|
|
|
|
def test_from_clause_is_not_a_select(self):
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
r"SELECT construct or equivalent text\(\) construct expected,",
|
|
expect,
|
|
roles.SelectStatementRole,
|
|
FromGrouping(t),
|
|
)
|
|
|
|
def test_text_as_from_select_statement(self):
|
|
is_true(
|
|
expect(
|
|
roles.SelectStatementRole,
|
|
text("select * from table").columns(t.c.q),
|
|
).compare(text("select * from table").columns(t.c.q))
|
|
)
|
|
|
|
def test_statement_coercion_select(self):
|
|
is_true(
|
|
expect(roles.CoerceTextStatementRole, select(t)).compare(select(t))
|
|
)
|
|
|
|
def test_statement_coercion_ddl(self):
|
|
d1 = DDL("hi")
|
|
is_(expect(roles.CoerceTextStatementRole, d1), d1)
|
|
|
|
def test_strict_from_clause_role(self):
|
|
stmt = select(t).subquery()
|
|
is_true(
|
|
expect(roles.StrictFromClauseRole, stmt).compare(
|
|
select(t).subquery()
|
|
)
|
|
)
|
|
|
|
def test_strict_from_clause_role_disallow_select(self):
|
|
stmt = select(t)
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
r"FROM expression, such as a Table or alias\(\) "
|
|
"object expected, got .*Select",
|
|
expect,
|
|
roles.StrictFromClauseRole,
|
|
stmt,
|
|
)
|
|
|
|
def test_anonymized_from_clause_role(self):
|
|
is_true(expect(roles.AnonymizedFromClauseRole, t).compare(t.alias()))
|
|
|
|
# note the compare for subquery().alias(), even if it is two
|
|
# plain Alias objects (which it won't be once we introduce the
|
|
# Subquery class), still compares based on alias() being present
|
|
# twice, that is, alias().alias() builds an alias of an alias, rather
|
|
# than just replacing the outer alias.
|
|
is_true(
|
|
expect(
|
|
roles.AnonymizedFromClauseRole, select(t).subquery()
|
|
).compare(select(t).subquery().alias())
|
|
)
|
|
|
|
def test_statement_coercion_sequence(self):
|
|
s1 = Sequence("hi")
|
|
is_(expect(roles.CoerceTextStatementRole, s1), s1)
|
|
|
|
def test_columns_clause_role(self):
|
|
is_(expect(roles.ColumnsClauseRole, t.c.q), t.c.q)
|
|
|
|
def test_truncated_label_role_neg(self):
|
|
self._test_role_neg_comparisons(roles.TruncatedLabelRole)
|
|
|
|
def test_where_having_role_neg(self):
|
|
self._test_role_neg_comparisons(roles.WhereHavingRole)
|
|
|
|
def test_by_of_role_neg(self):
|
|
self._test_role_neg_comparisons(roles.ByOfRole)
|
|
|
|
def test_const_expr_role_neg(self):
|
|
self._test_role_neg_comparisons(roles.ConstExprRole)
|
|
|
|
def test_columns_clause_role_neg(self):
|
|
self._test_role_neg_comparisons(roles.ColumnsClauseRole)
|
|
|
|
|
|
class SubqueryCoercionsTest(fixtures.TestBase, AssertsCompiledSQL):
|
|
__dialect__ = "default"
|
|
|
|
table1 = table(
|
|
"mytable",
|
|
column("myid", Integer),
|
|
column("name", String),
|
|
column("description", String),
|
|
)
|
|
|
|
table2 = table(
|
|
"myothertable", column("otherid", Integer), column("othername", String)
|
|
)
|
|
|
|
def test_column_roles(self):
|
|
stmt = select(self.table1.c.myid)
|
|
|
|
for role in [
|
|
roles.WhereHavingRole,
|
|
roles.ExpressionElementRole,
|
|
roles.ByOfRole,
|
|
roles.OrderByRole,
|
|
# roles.LabeledColumnExprRole
|
|
]:
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
coerced = coercions.expect(role, stmt)
|
|
is_true(coerced.compare(stmt.scalar_subquery()))
|
|
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
coerced = coercions.expect(role, stmt.alias())
|
|
is_true(coerced.compare(stmt.scalar_subquery()))
|
|
|
|
def test_labeled_role(self):
|
|
stmt = select(self.table1.c.myid)
|
|
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
coerced = coercions.expect(roles.LabeledColumnExprRole, stmt)
|
|
is_true(coerced.compare(stmt.scalar_subquery().label(None)))
|
|
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
coerced = coercions.expect(
|
|
roles.LabeledColumnExprRole, stmt.alias()
|
|
)
|
|
is_true(coerced.compare(stmt.scalar_subquery().label(None)))
|
|
|
|
def test_scalar_select(self):
|
|
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
self.assert_compile(
|
|
func.coalesce(select(self.table1.c.myid)),
|
|
"coalesce((SELECT mytable.myid FROM mytable))",
|
|
)
|
|
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
s = select(self.table1.c.myid).alias()
|
|
self.assert_compile(
|
|
select(self.table1.c.myid).where(self.table1.c.myid == s),
|
|
"SELECT mytable.myid FROM mytable WHERE "
|
|
"mytable.myid = (SELECT mytable.myid FROM "
|
|
"mytable)",
|
|
)
|
|
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
self.assert_compile(
|
|
select(self.table1.c.myid).where(s > self.table1.c.myid),
|
|
"SELECT mytable.myid FROM mytable WHERE "
|
|
"mytable.myid < (SELECT mytable.myid FROM "
|
|
"mytable)",
|
|
)
|
|
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
s = select(self.table1.c.myid).alias()
|
|
self.assert_compile(
|
|
select(self.table1.c.myid).where(self.table1.c.myid == s),
|
|
"SELECT mytable.myid FROM mytable WHERE "
|
|
"mytable.myid = (SELECT mytable.myid FROM "
|
|
"mytable)",
|
|
)
|
|
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
self.assert_compile(
|
|
select(self.table1.c.myid).where(s > self.table1.c.myid),
|
|
"SELECT mytable.myid FROM mytable WHERE "
|
|
"mytable.myid < (SELECT mytable.myid FROM "
|
|
"mytable)",
|
|
)
|
|
|
|
@testing.fixture()
|
|
def update_from_fixture(self):
|
|
metadata = MetaData()
|
|
|
|
mytable = Table(
|
|
"mytable",
|
|
metadata,
|
|
Column("myid", Integer),
|
|
Column("name", String(30)),
|
|
Column("description", String(50)),
|
|
)
|
|
myothertable = Table(
|
|
"myothertable",
|
|
metadata,
|
|
Column("otherid", Integer),
|
|
Column("othername", String(30)),
|
|
)
|
|
return mytable, myothertable
|
|
|
|
def test_correlated_update_two(self, update_from_fixture):
|
|
table1, t2 = update_from_fixture
|
|
|
|
mt = table1.alias()
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
u = update(table1).values(
|
|
{
|
|
table1.c.name: select(mt.c.name).where(
|
|
mt.c.myid == table1.c.myid
|
|
)
|
|
},
|
|
)
|
|
self.assert_compile(
|
|
u,
|
|
"UPDATE mytable SET name=(SELECT mytable_1.name FROM "
|
|
"mytable AS mytable_1 WHERE "
|
|
"mytable_1.myid = mytable.myid)",
|
|
)
|
|
|
|
def test_correlated_update_three(self, update_from_fixture):
|
|
table1, table2 = update_from_fixture
|
|
|
|
# test against a regular constructed subquery
|
|
s = select([table2], table2.c.otherid == table1.c.myid)
|
|
with testing.expect_warnings(
|
|
"implicitly coercing SELECT object to scalar subquery"
|
|
):
|
|
u = (
|
|
update(table1)
|
|
.where(table1.c.name == "jack")
|
|
.values({table1.c.name: s})
|
|
)
|
|
self.assert_compile(
|
|
u,
|
|
"UPDATE mytable SET name=(SELECT myothertable.otherid, "
|
|
"myothertable.othername FROM myothertable WHERE "
|
|
"myothertable.otherid = mytable.myid) "
|
|
"WHERE mytable.name = :name_1",
|
|
)
|