mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-07 01:10:52 -04:00
34740d33ab
Added a new concept of "operator classes" to the SQL operators supported by SQLAlchemy, represented within the enum :class:`.OperatorClass`. The purpose of this structure is to provide an extra layer of validation when a particular kind of SQL operation is used with a particular datatype, to catch early the use of an operator that does not have any relevance to the datatype in use; a simple example is an integer or numeric column used with a "string match" operator. Fixes: #12736 Change-Id: I44f46d7326aef6847dbf0cf7a325833f8e347da6
2224 lines
66 KiB
Python
2224 lines
66 KiB
Python
from __future__ import annotations
|
|
|
|
import threading
|
|
import time
|
|
from typing import List
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import exc
|
|
from sqlalchemy import testing
|
|
from sqlalchemy.future import select as future_select
|
|
from sqlalchemy.schema import Column
|
|
from sqlalchemy.schema import ForeignKey
|
|
from sqlalchemy.schema import Table
|
|
from sqlalchemy.sql import and_
|
|
from sqlalchemy.sql import bindparam
|
|
from sqlalchemy.sql import coercions
|
|
from sqlalchemy.sql import column
|
|
from sqlalchemy.sql import func
|
|
from sqlalchemy.sql import join
|
|
from sqlalchemy.sql import lambda_stmt
|
|
from sqlalchemy.sql import lambdas
|
|
from sqlalchemy.sql import literal
|
|
from sqlalchemy.sql import null
|
|
from sqlalchemy.sql import roles
|
|
from sqlalchemy.sql import select
|
|
from sqlalchemy.sql import table
|
|
from sqlalchemy.sql import util as sql_util
|
|
from sqlalchemy.sql.base import ExecutableOption
|
|
from sqlalchemy.sql.cache_key import HasCacheKey
|
|
from sqlalchemy.testing import assert_raises_message
|
|
from sqlalchemy.testing import AssertsCompiledSQL
|
|
from sqlalchemy.testing import eq_
|
|
from sqlalchemy.testing import fixtures
|
|
from sqlalchemy.testing import is_
|
|
from sqlalchemy.testing import ne_
|
|
from sqlalchemy.testing.assertions import expect_raises_message
|
|
from sqlalchemy.testing.assertsql import CompiledSQL
|
|
from sqlalchemy.types import ARRAY
|
|
from sqlalchemy.types import Boolean
|
|
from sqlalchemy.types import Integer
|
|
from sqlalchemy.types import JSON
|
|
from sqlalchemy.types import String
|
|
|
|
|
|
class LambdaElementTest(
|
|
fixtures.TestBase, testing.AssertsExecutionResults, AssertsCompiledSQL
|
|
):
|
|
__dialect__ = "default"
|
|
|
|
def test_reject_methods(self):
|
|
"""test #7032"""
|
|
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
subq = select(t1).subquery
|
|
|
|
with expect_raises_message(
|
|
exc.ArgumentError,
|
|
"Method <bound method SelectBase.subquery .* may not be "
|
|
"passed as a SQL expression",
|
|
):
|
|
select(func.count()).select_from(subq)
|
|
|
|
self.assert_compile(
|
|
select(func.count()).select_from(subq()),
|
|
"SELECT count(*) AS count_1 FROM "
|
|
"(SELECT t1.q AS q, t1.p AS p FROM t1) AS anon_1",
|
|
)
|
|
|
|
def test_select_whereclause(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
x = 10
|
|
y = 5
|
|
|
|
def go():
|
|
return select(t1).where(lambda: and_(t1.c.q == x, t1.c.p == y))
|
|
|
|
self.assert_compile(
|
|
go(), "SELECT t1.q, t1.p FROM t1 WHERE t1.q = :x_1 AND t1.p = :y_1"
|
|
)
|
|
|
|
self.assert_compile(
|
|
go(), "SELECT t1.q, t1.p FROM t1 WHERE t1.q = :x_1 AND t1.p = :y_1"
|
|
)
|
|
|
|
def test_global_tracking(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
global global_x, global_y
|
|
|
|
global_x = 10
|
|
global_y = 17
|
|
|
|
def go():
|
|
return select(t1).where(
|
|
lambda: and_(t1.c.q == global_x, t1.c.p == global_y)
|
|
)
|
|
|
|
self.assert_compile(
|
|
go(),
|
|
"SELECT t1.q, t1.p FROM t1 WHERE t1.q = :global_x_1 "
|
|
"AND t1.p = :global_y_1",
|
|
checkparams={"global_x_1": 10, "global_y_1": 17},
|
|
)
|
|
|
|
global_y = 9
|
|
|
|
self.assert_compile(
|
|
go(),
|
|
"SELECT t1.q, t1.p FROM t1 WHERE t1.q = :global_x_1 "
|
|
"AND t1.p = :global_y_1",
|
|
checkparams={"global_x_1": 10, "global_y_1": 9},
|
|
)
|
|
|
|
def test_boolean_constants(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
def go():
|
|
xy = True
|
|
stmt = select(t1).where(lambda: t1.c.q == xy)
|
|
return stmt
|
|
|
|
self.assert_compile(
|
|
go(), "SELECT t1.q, t1.p FROM t1 WHERE t1.q = :xy_1"
|
|
)
|
|
|
|
def test_execute_boolean(self, boolean_table_fixture, connection):
|
|
boolean_data = boolean_table_fixture
|
|
|
|
connection.execute(
|
|
boolean_data.insert(),
|
|
[{"id": 1, "data": True}, {"id": 2, "data": False}],
|
|
)
|
|
|
|
xy = True
|
|
|
|
def go():
|
|
stmt = select(lambda: boolean_data.c.id).where(
|
|
lambda: boolean_data.c.data == xy
|
|
)
|
|
return connection.execute(stmt)
|
|
|
|
result = go()
|
|
eq_(result.all(), [(1,)])
|
|
|
|
xy = False
|
|
result = go()
|
|
eq_(result.all(), [(2,)])
|
|
|
|
def test_in_expressions(self, user_address_fixture, connection):
|
|
"""test #6397. we initially were going to use two different
|
|
forms for "empty in" vs. regular "in", but instead we have an
|
|
improved substitution for "empty in". regardless, as there's more
|
|
going on with these, make sure lambdas work with them including
|
|
caching.
|
|
|
|
"""
|
|
users, _ = user_address_fixture
|
|
data = [
|
|
{"id": 1, "name": "u1"},
|
|
{"id": 2, "name": "u2"},
|
|
{"id": 3, "name": "u3"},
|
|
]
|
|
connection.execute(users.insert(), data)
|
|
|
|
def go(val):
|
|
stmt = lambdas.lambda_stmt(lambda: select(users.c.id))
|
|
stmt += lambda s: s.where(users.c.name.in_(val))
|
|
stmt += lambda s: s.order_by(users.c.id)
|
|
return connection.execute(stmt)
|
|
|
|
for case in [
|
|
[],
|
|
["u1", "u2"],
|
|
["u3"],
|
|
[],
|
|
["u1", "u2"],
|
|
]:
|
|
with testing.assertsql.assert_engine(testing.db) as asserter_:
|
|
result = go(case)
|
|
asserter_.assert_(
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name "
|
|
"IN (__[POSTCOMPILE_val_1]) ORDER BY users.id",
|
|
params={"val_1": case},
|
|
)
|
|
)
|
|
eq_(result.all(), [(e["id"],) for e in data if e["name"] in case])
|
|
|
|
def test_in_expr_compile(self, user_address_fixture):
|
|
users, _ = user_address_fixture
|
|
|
|
def go(val):
|
|
stmt = lambdas.lambda_stmt(lambda: select(users.c.id))
|
|
stmt += lambda s: s.where(users.c.name.in_(val))
|
|
stmt += lambda s: s.order_by(users.c.id)
|
|
return stmt
|
|
|
|
# note this also requires the type of the bind is copied
|
|
self.assert_compile(
|
|
go([]),
|
|
"SELECT users.id FROM users "
|
|
"WHERE users.name IN (NULL) AND (1 != 1) ORDER BY users.id",
|
|
literal_binds=True,
|
|
)
|
|
self.assert_compile(
|
|
go(["u1", "u2"]),
|
|
"SELECT users.id FROM users "
|
|
"WHERE users.name IN ('u1', 'u2') ORDER BY users.id",
|
|
literal_binds=True,
|
|
)
|
|
|
|
def test_bind_type(self, user_address_fixture):
|
|
users, _ = user_address_fixture
|
|
|
|
def go(val):
|
|
stmt = lambdas.lambda_stmt(lambda: select(users.c.id))
|
|
stmt += lambda s: s.where(users.c.name == val)
|
|
return stmt
|
|
|
|
self.assert_compile(
|
|
go("u1"),
|
|
"SELECT users.id FROM users WHERE users.name = 'u1'",
|
|
literal_binds=True,
|
|
)
|
|
|
|
def test_stale_checker_embedded(self):
|
|
def go(x):
|
|
stmt = select(lambda: x)
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
s1 = go(c1)
|
|
s2 = go(c1)
|
|
|
|
self.assert_compile(s1, "SELECT x")
|
|
self.assert_compile(s2, "SELECT x")
|
|
|
|
c1 = column("q")
|
|
|
|
s3 = go(c1)
|
|
self.assert_compile(s3, "SELECT q")
|
|
|
|
def test_stale_checker_statement(self):
|
|
def go(x):
|
|
stmt = lambdas.lambda_stmt(lambda: select(x))
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
s1 = go(c1)
|
|
s2 = go(c1)
|
|
|
|
self.assert_compile(s1, "SELECT x")
|
|
self.assert_compile(s2, "SELECT x")
|
|
|
|
c1 = column("q")
|
|
|
|
s3 = go(c1)
|
|
self.assert_compile(s3, "SELECT q")
|
|
|
|
def test_stale_checker_linked(self):
|
|
def go(x, y):
|
|
stmt = lambdas.lambda_stmt(lambda: select(x)) + (
|
|
lambda s: s.where(y > 5)
|
|
)
|
|
return stmt
|
|
|
|
c1 = oldc1 = column("x")
|
|
c2 = oldc2 = column("y")
|
|
s1 = go(c1, c2)
|
|
s2 = go(c1, c2)
|
|
|
|
self.assert_compile(s1, "SELECT x WHERE y > :y_1")
|
|
self.assert_compile(s2, "SELECT x WHERE y > :y_1")
|
|
|
|
c1 = column("q")
|
|
c2 = column("p")
|
|
|
|
s3 = go(c1, c2)
|
|
self.assert_compile(s3, "SELECT q WHERE p > :p_1")
|
|
|
|
s4 = go(c1, c2)
|
|
self.assert_compile(s4, "SELECT q WHERE p > :p_1")
|
|
|
|
s5 = go(oldc1, oldc2)
|
|
self.assert_compile(s5, "SELECT x WHERE y > :y_1")
|
|
|
|
def test_maintain_required_bindparam(self):
|
|
"""test that the "required" flag doesn't go away for bound
|
|
parameters"""
|
|
|
|
def go():
|
|
col_expr = column("x")
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt += lambda stmt: stmt.where(col_expr == bindparam(None))
|
|
|
|
return stmt
|
|
|
|
s1 = go()
|
|
|
|
with expect_raises_message(
|
|
exc.InvalidRequestError, "A value is required for bind parameter"
|
|
):
|
|
s1.compile().construct_params({})
|
|
s2 = go()
|
|
with expect_raises_message(
|
|
exc.InvalidRequestError, "A value is required for bind parameter"
|
|
):
|
|
s2.compile().construct_params({})
|
|
|
|
def test_stmt_lambda_w_additional_hascachekey_variants(self):
|
|
def go(col_expr, q):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt += lambda stmt: stmt.where(col_expr == q)
|
|
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
c2 = column("y")
|
|
|
|
s1 = go(c1, 5)
|
|
s2 = go(c2, 10)
|
|
s3 = go(c1, 8)
|
|
s4 = go(c2, 12)
|
|
|
|
self.assert_compile(
|
|
s1, "SELECT x WHERE x = :q_1", checkparams={"q_1": 5}
|
|
)
|
|
self.assert_compile(
|
|
s2, "SELECT y WHERE y = :q_1", checkparams={"q_1": 10}
|
|
)
|
|
self.assert_compile(
|
|
s3, "SELECT x WHERE x = :q_1", checkparams={"q_1": 8}
|
|
)
|
|
self.assert_compile(
|
|
s4, "SELECT y WHERE y = :q_1", checkparams={"q_1": 12}
|
|
)
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
s3key = s3._generate_cache_key()
|
|
s4key = s4._generate_cache_key()
|
|
|
|
eq_(s1key[0], s3key[0])
|
|
eq_(s2key[0], s4key[0])
|
|
ne_(s1key[0], s2key[0])
|
|
|
|
def test_stmt_lambda_w_atonce_whereclause_values_notrack(self):
|
|
def go(col_expr, whereclause):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt = stmt.add_criteria(
|
|
lambda stmt: stmt.where(whereclause), enable_tracking=False
|
|
)
|
|
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
|
|
s1 = go(c1, c1 == 5)
|
|
s2 = go(c1, c1 == 10)
|
|
|
|
self.assert_compile(
|
|
s1, "SELECT x WHERE x = :x_1", checkparams={"x_1": 5}
|
|
)
|
|
|
|
# and as we see, this is wrong. Because whereclause
|
|
# is fixed for the lambda and we do not re-evaluate the closure
|
|
# for this value changing. this can't be passed unless
|
|
# enable_tracking=False.
|
|
self.assert_compile(
|
|
s2, "SELECT x WHERE x = :x_1", checkparams={"x_1": 5}
|
|
)
|
|
|
|
def test_stmt_lambda_w_atonce_whereclause_values(self):
|
|
c2 = column("y")
|
|
|
|
def go(col_expr, whereclause, x):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt = stmt.add_criteria(
|
|
lambda stmt: stmt.where(whereclause).order_by(c2 > x),
|
|
)
|
|
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
|
|
s1 = go(c1, c1 == 5, 9)
|
|
s2 = go(c1, c1 == 10, 15)
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
|
|
eq_([b.value for b in s1key.bindparams], [5, 9])
|
|
eq_([b.value for b in s2key.bindparams], [10, 15])
|
|
|
|
self.assert_compile(
|
|
s1,
|
|
"SELECT x WHERE x = :x_1 ORDER BY y > :x_2",
|
|
checkparams={"x_1": 5, "x_2": 9},
|
|
)
|
|
|
|
self.assert_compile(
|
|
s2,
|
|
"SELECT x WHERE x = :x_1 ORDER BY y > :x_2",
|
|
checkparams={"x_1": 10, "x_2": 15},
|
|
)
|
|
|
|
def test_conditional_must_be_tracked(self):
|
|
tab = table("foo", column("id"), column("col"))
|
|
|
|
def run_my_statement(parameter, add_criteria=False):
|
|
stmt = lambda_stmt(lambda: select(tab))
|
|
|
|
stmt = stmt.add_criteria(
|
|
lambda s: (
|
|
s.where(tab.c.col > parameter)
|
|
if add_criteria
|
|
else s.where(tab.c.col == parameter)
|
|
),
|
|
)
|
|
|
|
stmt += lambda s: s.order_by(tab.c.id)
|
|
|
|
return stmt
|
|
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
"Closure variable named 'add_criteria' inside of lambda callable",
|
|
run_my_statement,
|
|
5,
|
|
False,
|
|
)
|
|
|
|
def test_boolean_conditionals(self):
|
|
tab = table("foo", column("id"), column("col"))
|
|
|
|
def run_my_statement(parameter, add_criteria=False):
|
|
stmt = lambda_stmt(lambda: select(tab))
|
|
|
|
stmt = stmt.add_criteria(
|
|
lambda s: (
|
|
s.where(tab.c.col > parameter)
|
|
if add_criteria
|
|
else s.where(tab.c.col == parameter)
|
|
),
|
|
track_on=[add_criteria],
|
|
)
|
|
|
|
stmt += lambda s: s.order_by(tab.c.id)
|
|
|
|
return stmt
|
|
|
|
c1 = run_my_statement(5, False)
|
|
c2 = run_my_statement(10, True)
|
|
c3 = run_my_statement(18, False)
|
|
|
|
ck1 = c1._generate_cache_key()
|
|
ck2 = c2._generate_cache_key()
|
|
ck3 = c3._generate_cache_key()
|
|
|
|
eq_(ck1[0], ck3[0])
|
|
ne_(ck1[0], ck2[0])
|
|
|
|
self.assert_compile(
|
|
c1,
|
|
"SELECT foo.id, foo.col FROM foo WHERE "
|
|
"foo.col = :parameter_1 ORDER BY foo.id",
|
|
)
|
|
self.assert_compile(
|
|
c2,
|
|
"SELECT foo.id, foo.col FROM foo "
|
|
"WHERE foo.col > :parameter_1 ORDER BY foo.id",
|
|
)
|
|
self.assert_compile(
|
|
c3,
|
|
"SELECT foo.id, foo.col FROM foo WHERE "
|
|
"foo.col = :parameter_1 ORDER BY foo.id",
|
|
)
|
|
|
|
def test_stmt_lambda_plain_customtrack(self):
|
|
c2 = column("y")
|
|
|
|
def go(col_expr, whereclause, p):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt = stmt.add_criteria(lambda stmt: stmt.where(whereclause))
|
|
stmt = stmt.add_criteria(
|
|
lambda stmt: stmt.order_by(col_expr), track_on=(col_expr,)
|
|
)
|
|
stmt = stmt.add_criteria(lambda stmt: stmt.where(col_expr == p))
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
c2 = column("y")
|
|
|
|
s1 = go(c1, c1 == 5, 9)
|
|
s2 = go(c1, c1 == 10, 15)
|
|
s3 = go(c2, c2 == 18, 12)
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
s3key = s3._generate_cache_key()
|
|
|
|
eq_([b.value for b in s1key.bindparams], [5, 9])
|
|
eq_([b.value for b in s2key.bindparams], [10, 15])
|
|
eq_([b.value for b in s3key.bindparams], [18, 12])
|
|
|
|
self.assert_compile(
|
|
s1,
|
|
"SELECT x WHERE x = :x_1 AND x = :p_1 ORDER BY x",
|
|
checkparams={"x_1": 5, "p_1": 9},
|
|
)
|
|
|
|
self.assert_compile(
|
|
s2,
|
|
"SELECT x WHERE x = :x_1 AND x = :p_1 ORDER BY x",
|
|
checkparams={"x_1": 10, "p_1": 15},
|
|
)
|
|
|
|
self.assert_compile(
|
|
s3,
|
|
"SELECT y WHERE y = :y_1 AND y = :p_1 ORDER BY y",
|
|
checkparams={"y_1": 18, "p_1": 12},
|
|
)
|
|
|
|
@testing.combinations(
|
|
(True,),
|
|
(False,),
|
|
)
|
|
def test_stmt_lambda_w_atonce_whereclause_customtrack_binds(
|
|
self, use_tuple
|
|
):
|
|
c2 = column("y")
|
|
|
|
# this pattern is *completely unnecessary*, and I would prefer
|
|
# if we can detect this and just raise, because when it is not done
|
|
# correctly, it is *extremely* difficult to catch it failing.
|
|
# however I also can't come up with a reliable way to catch it.
|
|
# so we will keep the use of "track_on" to be internal.
|
|
|
|
if use_tuple:
|
|
|
|
def go(col_expr, whereclause, p):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt = stmt.add_criteria(
|
|
lambda stmt: stmt.where(whereclause).order_by(
|
|
col_expr > p
|
|
),
|
|
track_on=((whereclause,), whereclause.right.value),
|
|
)
|
|
|
|
return stmt
|
|
|
|
else:
|
|
|
|
def go(col_expr, whereclause, p):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt = stmt.add_criteria(
|
|
lambda stmt: stmt.where(whereclause).order_by(
|
|
col_expr > p
|
|
),
|
|
track_on=(whereclause, whereclause.right.value),
|
|
)
|
|
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
c2 = column("y")
|
|
|
|
s1 = go(c1, c1 == 5, 9)
|
|
s2 = go(c1, c1 == 10, 15)
|
|
s3 = go(c2, c2 == 18, 12)
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
s3key = s3._generate_cache_key()
|
|
|
|
eq_([b.value for b in s1key.bindparams], [5, 9])
|
|
eq_([b.value for b in s2key.bindparams], [10, 15])
|
|
eq_([b.value for b in s3key.bindparams], [18, 12])
|
|
|
|
self.assert_compile(
|
|
s1,
|
|
"SELECT x WHERE x = :x_1 ORDER BY x > :p_1",
|
|
checkparams={"x_1": 5, "p_1": 9},
|
|
)
|
|
|
|
self.assert_compile(
|
|
s2,
|
|
"SELECT x WHERE x = :x_1 ORDER BY x > :p_1",
|
|
checkparams={"x_1": 10, "p_1": 15},
|
|
)
|
|
|
|
self.assert_compile(
|
|
s3,
|
|
"SELECT y WHERE y = :y_1 ORDER BY y > :p_1",
|
|
checkparams={"y_1": 18, "p_1": 12},
|
|
)
|
|
|
|
def test_stmt_lambda_track_closure_binds_one(self):
|
|
def go(col_expr, whereclause):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt += lambda stmt: stmt.where(whereclause)
|
|
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
|
|
s1 = go(c1, c1 == 5)
|
|
s2 = go(c1, c1 == 10)
|
|
|
|
self.assert_compile(
|
|
s1, "SELECT x WHERE x = :x_1", checkparams={"x_1": 5}
|
|
)
|
|
self.assert_compile(
|
|
s2, "SELECT x WHERE x = :x_1", checkparams={"x_1": 10}
|
|
)
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
|
|
eq_(s1key.key, s2key.key)
|
|
|
|
eq_([b.value for b in s1key.bindparams], [5])
|
|
eq_([b.value for b in s2key.bindparams], [10])
|
|
|
|
def test_stmt_lambda_track_closure_binds_two(self):
|
|
def go(col_expr, whereclause, x, y):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt += lambda stmt: stmt.where(whereclause).where(
|
|
and_(c1 == x, c1 < y)
|
|
)
|
|
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
|
|
s1 = go(c1, c1 == 5, 8, 9)
|
|
s2 = go(c1, c1 == 10, 12, 14)
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
|
|
self.assert_compile(
|
|
s1,
|
|
"SELECT x WHERE x = :x_1 AND x = :x_2 AND x < :y_1",
|
|
checkparams={"x_1": 5, "x_2": 8, "y_1": 9},
|
|
)
|
|
self.assert_compile(
|
|
s2,
|
|
"SELECT x WHERE x = :x_1 AND x = :x_2 AND x < :y_1",
|
|
checkparams={"x_1": 10, "x_2": 12, "y_1": 14},
|
|
)
|
|
|
|
eq_([b.value for b in s1key.bindparams], [5, 8, 9])
|
|
eq_([b.value for b in s2key.bindparams], [10, 12, 14])
|
|
|
|
s1_compiled_cached = s1.compile(cache_key=s1key)
|
|
|
|
params = s1_compiled_cached.construct_params(
|
|
extracted_parameters=s2key[1]
|
|
)
|
|
|
|
eq_(params, {"x_1": 10, "x_2": 12, "y_1": 14})
|
|
|
|
def test_stmt_lambda_track_closure_binds_three(self):
|
|
def go(col_expr, whereclause, x, y):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt += lambda stmt: stmt.where(whereclause)
|
|
stmt += lambda stmt: stmt.where(and_(c1 == x, c1 < y))
|
|
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
|
|
s1 = go(c1, c1 == 5, 8, 9)
|
|
s2 = go(c1, c1 == 10, 12, 14)
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
|
|
self.assert_compile(
|
|
s1,
|
|
"SELECT x WHERE x = :x_1 AND x = :x_2 AND x < :y_1",
|
|
checkparams={"x_1": 5, "x_2": 8, "y_1": 9},
|
|
)
|
|
self.assert_compile(
|
|
s2,
|
|
"SELECT x WHERE x = :x_1 AND x = :x_2 AND x < :y_1",
|
|
checkparams={"x_1": 10, "x_2": 12, "y_1": 14},
|
|
)
|
|
|
|
eq_([b.value for b in s1key.bindparams], [5, 8, 9])
|
|
eq_([b.value for b in s2key.bindparams], [10, 12, 14])
|
|
|
|
s1_compiled_cached = s1.compile(cache_key=s1key)
|
|
|
|
params = s1_compiled_cached.construct_params(
|
|
extracted_parameters=s2key[1]
|
|
)
|
|
|
|
eq_(params, {"x_1": 10, "x_2": 12, "y_1": 14})
|
|
|
|
def test_stmt_lambda_w_atonce_whereclause_novalue(self):
|
|
def go(col_expr, whereclause):
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt += lambda stmt: stmt.where(whereclause)
|
|
|
|
return stmt
|
|
|
|
c1 = column("x")
|
|
|
|
s1 = go(c1, bindparam("x"))
|
|
|
|
self.assert_compile(s1, "SELECT x WHERE :x")
|
|
|
|
def test_reject_plain_object(self):
|
|
# with #5765 we move to no longer allow closure variables that
|
|
# refer to unknown types of objects inside the lambda. these have
|
|
# to be resolved outside of the lambda because we otherwise can't
|
|
# be sure they can be safely used as cache keys.
|
|
class Thing:
|
|
def __init__(self, col_expr):
|
|
self.col_expr = col_expr
|
|
|
|
def go(thing, q):
|
|
stmt = lambdas.lambda_stmt(lambda: select(thing.col_expr))
|
|
stmt += lambda stmt: stmt.where(thing.col_expr == q)
|
|
|
|
return stmt
|
|
|
|
c1 = Thing(column("x"))
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
"Closure variable named 'thing' inside of lambda callable",
|
|
go,
|
|
c1,
|
|
5,
|
|
)
|
|
|
|
def test_plain_object_ok_w_tracking_disabled(self):
|
|
# with #5765 we move to no longer allow closure variables that
|
|
# refer to unknown types of objects inside the lambda. these have
|
|
# to be resolved outside of the lambda because we otherwise can't
|
|
# be sure they can be safely used as cache keys.
|
|
class Thing:
|
|
def __init__(self, col_expr):
|
|
self.col_expr = col_expr
|
|
|
|
def go(thing, q):
|
|
stmt = lambdas.lambda_stmt(
|
|
lambda: select(thing.col_expr), track_closure_variables=False
|
|
)
|
|
stmt = stmt.add_criteria(
|
|
lambda stmt: stmt.where(thing.col_expr == q),
|
|
track_closure_variables=False,
|
|
)
|
|
|
|
return stmt
|
|
|
|
c1 = Thing(column("x"))
|
|
c2 = Thing(column("y"))
|
|
|
|
s1 = go(c1, 5)
|
|
s2 = go(c2, 10)
|
|
s3 = go(c1, 8)
|
|
s4 = go(c2, 12)
|
|
|
|
self.assert_compile(
|
|
s1, "SELECT x WHERE x = :q_1", checkparams={"q_1": 5}
|
|
)
|
|
# note this is wrong, because no tracking
|
|
self.assert_compile(
|
|
s2, "SELECT x WHERE x = :q_1", checkparams={"q_1": 10}
|
|
)
|
|
self.assert_compile(
|
|
s3, "SELECT x WHERE x = :q_1", checkparams={"q_1": 8}
|
|
)
|
|
# also wrong
|
|
self.assert_compile(
|
|
s4, "SELECT x WHERE x = :q_1", checkparams={"q_1": 12}
|
|
)
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
s3key = s3._generate_cache_key()
|
|
s4key = s4._generate_cache_key()
|
|
|
|
# all one cache key
|
|
eq_(s1key[0], s3key[0])
|
|
eq_(s2key[0], s4key[0])
|
|
eq_(s1key[0], s2key[0])
|
|
|
|
def test_plain_object_used_outside_lambda(self):
|
|
# test the above 'test_reject_plain_object' with the expected
|
|
# workaround
|
|
|
|
class Thing:
|
|
def __init__(self, col_expr):
|
|
self.col_expr = col_expr
|
|
|
|
def go(thing, q):
|
|
col_expr = thing.col_expr
|
|
stmt = lambdas.lambda_stmt(lambda: select(col_expr))
|
|
stmt += lambda stmt: stmt.where(col_expr == q)
|
|
|
|
return stmt
|
|
|
|
c1 = Thing(column("x"))
|
|
c2 = Thing(column("y"))
|
|
|
|
s1 = go(c1, 5)
|
|
s2 = go(c2, 10)
|
|
s3 = go(c1, 8)
|
|
s4 = go(c2, 12)
|
|
|
|
self.assert_compile(
|
|
s1, "SELECT x WHERE x = :q_1", checkparams={"q_1": 5}
|
|
)
|
|
self.assert_compile(
|
|
s2, "SELECT y WHERE y = :q_1", checkparams={"q_1": 10}
|
|
)
|
|
self.assert_compile(
|
|
s3, "SELECT x WHERE x = :q_1", checkparams={"q_1": 8}
|
|
)
|
|
self.assert_compile(
|
|
s4, "SELECT y WHERE y = :q_1", checkparams={"q_1": 12}
|
|
)
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
s3key = s3._generate_cache_key()
|
|
s4key = s4._generate_cache_key()
|
|
|
|
eq_(s1key[0], s3key[0])
|
|
eq_(s2key[0], s4key[0])
|
|
ne_(s1key[0], s2key[0])
|
|
|
|
def test_stmt_lambda_w_set_of_opts(self):
|
|
stmt = lambdas.lambda_stmt(lambda: select(column("x")))
|
|
|
|
class MyUncacheable(ExecutableOption):
|
|
pass
|
|
|
|
opts = {MyUncacheable()}
|
|
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
"Closure variable named 'opts' inside of lambda callable ",
|
|
stmt.__add__,
|
|
lambda stmt: stmt.options(*opts),
|
|
)
|
|
|
|
def test_detect_embedded_callables_one(self):
|
|
t1 = table("t1", column("q"))
|
|
|
|
x = 1
|
|
|
|
def go():
|
|
def foo():
|
|
return x
|
|
|
|
stmt = select(t1).where(lambda: t1.c.q == foo())
|
|
return stmt
|
|
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
r"Can't invoke Python callable foo\(\) inside of lambda "
|
|
"expression ",
|
|
go,
|
|
)
|
|
|
|
def test_detect_embedded_callables_two(self):
|
|
t1 = table("t1", column("q"), column("y"))
|
|
|
|
def go():
|
|
def foo():
|
|
return t1.c.y
|
|
|
|
stmt = select(t1).where(lambda: t1.c.q == foo())
|
|
return stmt
|
|
|
|
self.assert_compile(
|
|
go(), "SELECT t1.q, t1.y FROM t1 WHERE t1.q = t1.y"
|
|
)
|
|
|
|
def test_detect_embedded_callables_three(self):
|
|
t1 = table("t1", column("q"), column("y"))
|
|
|
|
def go():
|
|
def foo():
|
|
t1.c.y
|
|
|
|
stmt = select(t1).where(lambda: t1.c.q == getattr(t1.c, "y"))
|
|
return stmt
|
|
|
|
self.assert_compile(
|
|
go(), "SELECT t1.q, t1.y FROM t1 WHERE t1.q = t1.y"
|
|
)
|
|
|
|
def test_detect_embedded_callables_four(self):
|
|
t1 = table("t1", column("q"))
|
|
|
|
x = 1
|
|
|
|
def go():
|
|
def foo():
|
|
return x
|
|
|
|
stmt = select(t1).where(
|
|
lambdas.LambdaElement(
|
|
lambda: t1.c.q == foo(),
|
|
roles.WhereHavingRole,
|
|
lambdas.LambdaOptions(track_bound_values=False),
|
|
)
|
|
)
|
|
return stmt
|
|
|
|
self.assert_compile(
|
|
go(),
|
|
"SELECT t1.q FROM t1 WHERE t1.q = :q_1",
|
|
checkparams={"q_1": 1},
|
|
)
|
|
|
|
# we're not tracking it
|
|
x = 2
|
|
|
|
self.assert_compile(
|
|
go(),
|
|
"SELECT t1.q FROM t1 WHERE t1.q = :q_1",
|
|
checkparams={"q_1": 1},
|
|
)
|
|
|
|
def test_offline_cache_key_no_paramtrack(self):
|
|
def go():
|
|
stmt = lambdas.lambda_stmt(
|
|
lambda: select(column("x")).where(
|
|
column("y") == bindparam("q")
|
|
),
|
|
global_track_bound_values=False,
|
|
)
|
|
|
|
return stmt
|
|
|
|
s1 = go()
|
|
|
|
eq_(
|
|
s1._generate_cache_key().to_offline_string({}, s1, {"q": 5}),
|
|
"('SELECT x \\nWHERE y = :q', (5,))",
|
|
)
|
|
|
|
def test_offline_cache_key_paramtrack(self):
|
|
def go(param):
|
|
stmt = lambdas.lambda_stmt(
|
|
lambda: select(column("x")).where(column("y") == param),
|
|
)
|
|
|
|
return stmt
|
|
|
|
s1 = go(5)
|
|
|
|
param_key = s1._resolved._where_criteria[0].right.key
|
|
eq_(
|
|
s1._generate_cache_key().to_offline_string(
|
|
{}, s1, {param_key: 10}
|
|
),
|
|
"('SELECT x \\nWHERE y = :param_1', (10,))",
|
|
)
|
|
|
|
def test_stmt_lambda_w_list_of_opts(self):
|
|
def go(opts):
|
|
stmt = lambdas.lambda_stmt(lambda: select(column("x")))
|
|
stmt += lambda stmt: stmt.options(*opts)
|
|
|
|
return stmt
|
|
|
|
class SomeOpt(HasCacheKey, ExecutableOption):
|
|
def __init__(self, x):
|
|
self.x = x
|
|
|
|
def _gen_cache_key(self, anon_map, bindparams):
|
|
return (SomeOpt, self.x)
|
|
|
|
s1 = go([SomeOpt("a"), SomeOpt("b")])
|
|
|
|
s2 = go([SomeOpt("a"), SomeOpt("b")])
|
|
|
|
s3 = go([SomeOpt("q"), SomeOpt("b")])
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
s3key = s3._generate_cache_key()
|
|
|
|
eq_(s1key.key, s2key.key)
|
|
ne_(s1key.key, s3key.key)
|
|
|
|
def test_stmt_lambda_opt_w_key(self):
|
|
"""test issue related to #6887"""
|
|
|
|
def go(opts):
|
|
stmt = lambdas.lambda_stmt(lambda: select(column("x")))
|
|
stmt += lambda stmt: stmt.options(*opts)
|
|
|
|
return stmt
|
|
|
|
class SomeOpt(HasCacheKey, ExecutableOption):
|
|
def _gen_cache_key(self, anon_map, bindparams):
|
|
return ("fixed_key",)
|
|
|
|
# generates no key, will not be cached
|
|
eq_(SomeOpt()._generate_cache_key().key, ("fixed_key",))
|
|
|
|
s1o, s2o = SomeOpt(), SomeOpt()
|
|
s1 = go([s1o])
|
|
s2 = go([s2o])
|
|
|
|
s1key = s1._generate_cache_key()
|
|
s2key = s2._generate_cache_key()
|
|
|
|
eq_(s1key.key[-1], (("fixed_key",),))
|
|
eq_(s1key.key, s2key.key)
|
|
|
|
eq_(s1._resolved._with_options, (s1o,))
|
|
eq_(s2._resolved._with_options, (s1o,))
|
|
ne_(s2._resolved._with_options, (s2o,))
|
|
|
|
def test_stmt_lambda_opt_w_no_key(self):
|
|
"""test issue related to #6887"""
|
|
|
|
def go(opts):
|
|
stmt = lambdas.lambda_stmt(lambda: select(column("x")))
|
|
stmt += lambda stmt: stmt.options(*opts)
|
|
|
|
return stmt
|
|
|
|
class SomeOpt(HasCacheKey, ExecutableOption):
|
|
inherit_cache = False
|
|
|
|
# generates no key, will not be cached
|
|
eq_(SomeOpt()._generate_cache_key(), None)
|
|
|
|
s1o, s2o = SomeOpt(), SomeOpt()
|
|
s1 = go([s1o])
|
|
s2 = go([s2o])
|
|
|
|
s1key = s1._generate_cache_key()
|
|
|
|
eq_(s1key, None)
|
|
|
|
eq_(s1._resolved._with_options, (s1o,))
|
|
eq_(s2._resolved._with_options, (s2o,))
|
|
ne_(s2._resolved._with_options, (s1o,))
|
|
|
|
def test_stmt_lambda_hey_theres_multiple_paths(self):
|
|
def go(x, y):
|
|
stmt = lambdas.lambda_stmt(lambda: select(column("x")))
|
|
|
|
if x > 5:
|
|
stmt += lambda stmt: stmt.where(column("x") == x)
|
|
else:
|
|
stmt += lambda stmt: stmt.where(column("y") == y)
|
|
|
|
stmt += lambda stmt: stmt.order_by(column("q"))
|
|
|
|
# TODO: need more path variety here to exercise
|
|
# using a full path key
|
|
|
|
return stmt
|
|
|
|
s1 = go(2, 5)
|
|
s2 = go(8, 7)
|
|
s3 = go(4, 9)
|
|
s4 = go(10, 1)
|
|
|
|
self.assert_compile(s1, "SELECT x WHERE y = :y_1 ORDER BY q")
|
|
self.assert_compile(s2, "SELECT x WHERE x = :x_1 ORDER BY q")
|
|
self.assert_compile(s3, "SELECT x WHERE y = :y_1 ORDER BY q")
|
|
self.assert_compile(s4, "SELECT x WHERE x = :x_1 ORDER BY q")
|
|
|
|
def test_coercion_cols_clause(self):
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
"Textual column expression 'f' should be explicitly declared",
|
|
select,
|
|
lambda: "foo",
|
|
)
|
|
|
|
def test_coercion_where_clause(self):
|
|
assert_raises_message(
|
|
exc.ArgumentError,
|
|
"SQL expression for WHERE/HAVING role expected, got 5",
|
|
select(column("q")).where,
|
|
5,
|
|
)
|
|
|
|
def test_propagate_attrs_full_stmt(self):
|
|
col = column("q")
|
|
col._propagate_attrs = col._propagate_attrs.union(
|
|
{"compile_state_plugin": "x", "plugin_subject": "y"}
|
|
)
|
|
|
|
stmt = lambdas.lambda_stmt(lambda: select(col))
|
|
|
|
eq_(
|
|
stmt._propagate_attrs,
|
|
{"compile_state_plugin": "x", "plugin_subject": "y"},
|
|
)
|
|
|
|
def test_propagate_attrs_cols_clause(self):
|
|
col = column("q")
|
|
col._propagate_attrs = col._propagate_attrs.union(
|
|
{"compile_state_plugin": "x", "plugin_subject": "y"}
|
|
)
|
|
|
|
stmt = select(lambda: col)
|
|
|
|
eq_(
|
|
stmt._propagate_attrs,
|
|
{"compile_state_plugin": "x", "plugin_subject": "y"},
|
|
)
|
|
|
|
def test_propagate_attrs_from_clause(self):
|
|
col = column("q")
|
|
|
|
t = table("t", column("y"))
|
|
|
|
t._propagate_attrs = t._propagate_attrs.union(
|
|
{"compile_state_plugin": "x", "plugin_subject": "y"}
|
|
)
|
|
|
|
stmt = future_select(lambda: col).join(t)
|
|
|
|
eq_(
|
|
stmt._propagate_attrs,
|
|
{"compile_state_plugin": "x", "plugin_subject": "y"},
|
|
)
|
|
|
|
def test_select_legacy_expanding_columns(self):
|
|
q, p, r = column("q"), column("p"), column("r")
|
|
|
|
stmt = select(lambda: (q, p, r))
|
|
|
|
self.assert_compile(stmt, "SELECT q, p, r")
|
|
|
|
def test_select_future_expanding_columns(self):
|
|
q, p, r = column("q"), column("p"), column("r")
|
|
|
|
stmt = future_select(lambda: (q, p, r))
|
|
|
|
self.assert_compile(stmt, "SELECT q, p, r")
|
|
|
|
def test_select_fromclause(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
t2 = table("t2", column("y"))
|
|
|
|
def go():
|
|
return select(t1).select_from(
|
|
lambda: join(t1, t2, lambda: t1.c.q == t2.c.y)
|
|
)
|
|
|
|
self.assert_compile(
|
|
go(), "SELECT t1.q, t1.p FROM t1 JOIN t2 ON t1.q = t2.y"
|
|
)
|
|
|
|
self.assert_compile(
|
|
go(), "SELECT t1.q, t1.p FROM t1 JOIN t2 ON t1.q = t2.y"
|
|
)
|
|
|
|
def test_in_parameters_one(self):
|
|
expr1 = select(1).where(column("q").in_(["a", "b", "c"]))
|
|
self.assert_compile(expr1, "SELECT 1 WHERE q IN (__[POSTCOMPILE_q_1])")
|
|
|
|
self.assert_compile(
|
|
expr1,
|
|
"SELECT 1 WHERE q IN (:q_1_1, :q_1_2, :q_1_3)",
|
|
render_postcompile=True,
|
|
checkparams={"q_1_1": "a", "q_1_2": "b", "q_1_3": "c"},
|
|
)
|
|
|
|
def test_in_parameters_two(self):
|
|
expr2 = select(1).where(lambda: column("q").in_(["a", "b", "c"]))
|
|
self.assert_compile(expr2, "SELECT 1 WHERE q IN (__[POSTCOMPILE_q_1])")
|
|
self.assert_compile(
|
|
expr2,
|
|
"SELECT 1 WHERE q IN (:q_1_1, :q_1_2, :q_1_3)",
|
|
render_postcompile=True,
|
|
checkparams={"q_1_1": "a", "q_1_2": "b", "q_1_3": "c"},
|
|
)
|
|
|
|
def test_in_parameters_three(self):
|
|
expr3 = lambdas.lambda_stmt(
|
|
lambda: select(1).where(column("q").in_(["a", "b", "c"]))
|
|
)
|
|
self.assert_compile(expr3, "SELECT 1 WHERE q IN (__[POSTCOMPILE_q_1])")
|
|
self.assert_compile(
|
|
expr3,
|
|
"SELECT 1 WHERE q IN (:q_1_1, :q_1_2, :q_1_3)",
|
|
render_postcompile=True,
|
|
checkparams={"q_1_1": "a", "q_1_2": "b", "q_1_3": "c"},
|
|
)
|
|
|
|
def test_in_parameters_four(self):
|
|
def go(names):
|
|
return lambdas.lambda_stmt(
|
|
lambda: select(1).where(column("q").in_(names))
|
|
)
|
|
|
|
expr4 = go(["a", "b", "c"])
|
|
self.assert_compile(
|
|
expr4, "SELECT 1 WHERE q IN (__[POSTCOMPILE_names_1])"
|
|
)
|
|
self.assert_compile(
|
|
expr4,
|
|
"SELECT 1 WHERE q IN (:names_1_1, :names_1_2, :names_1_3)",
|
|
render_postcompile=True,
|
|
checkparams={"names_1_1": "a", "names_1_2": "b", "names_1_3": "c"},
|
|
)
|
|
|
|
def test_in_parameters_five(self):
|
|
def go(n1, n2):
|
|
stmt = lambdas.lambda_stmt(
|
|
lambda: select(1).where(column("q", ARRAY(String)).in_(n1))
|
|
)
|
|
stmt += lambda s: s.where(column("y", ARRAY(String)).in_(n2))
|
|
return stmt
|
|
|
|
expr = go(["a", "b", "c"], ["d", "e", "f"])
|
|
self.assert_compile(
|
|
expr,
|
|
"SELECT 1 WHERE q IN (:n1_1_1, :n1_1_2, :n1_1_3) "
|
|
"AND y IN (:n2_1_1, :n2_1_2, :n2_1_3)",
|
|
render_postcompile=True,
|
|
checkparams={
|
|
"n1_1_1": "a",
|
|
"n1_1_2": "b",
|
|
"n1_1_3": "c",
|
|
"n2_1_1": "d",
|
|
"n2_1_2": "e",
|
|
"n2_1_3": "f",
|
|
},
|
|
)
|
|
|
|
def test_in_columnelement(self):
|
|
# test issue #5768
|
|
|
|
def go():
|
|
v = [literal("a"), literal("b")]
|
|
expr1 = select(1).where(lambda: column("q").in_(v))
|
|
return expr1
|
|
|
|
self.assert_compile(go(), "SELECT 1 WHERE q IN (:param_1, :param_2)")
|
|
|
|
self.assert_compile(
|
|
go(),
|
|
"SELECT 1 WHERE q IN (:param_1, :param_2)",
|
|
render_postcompile=True,
|
|
checkparams={"param_1": "a", "param_2": "b"},
|
|
)
|
|
|
|
def test_select_columns_clause(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
g = 5
|
|
|
|
def go():
|
|
return select(lambda: t1.c.q, lambda: t1.c.p + g)
|
|
|
|
stmt = go()
|
|
self.assert_compile(
|
|
stmt,
|
|
"SELECT t1.q, t1.p + :g_1 AS anon_1 FROM t1",
|
|
checkparams={"g_1": 5},
|
|
)
|
|
eq_(stmt._generate_cache_key()._generate_param_dict(), {"g_1": 5})
|
|
|
|
g = 10
|
|
stmt = go()
|
|
self.assert_compile(
|
|
stmt,
|
|
"SELECT t1.q, t1.p + :g_1 AS anon_1 FROM t1",
|
|
checkparams={"g_1": 10},
|
|
)
|
|
eq_(stmt._generate_cache_key()._generate_param_dict(), {"g_1": 10})
|
|
|
|
@testing.metadata_fixture()
|
|
def user_address_fixture(self, metadata):
|
|
users = Table(
|
|
"users",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("name", String(50)),
|
|
)
|
|
addresses = Table(
|
|
"addresses",
|
|
metadata,
|
|
Column("id", Integer),
|
|
Column("user_id", ForeignKey("users.id")),
|
|
Column("email", String(50)),
|
|
)
|
|
return users, addresses
|
|
|
|
@testing.metadata_fixture()
|
|
def boolean_table_fixture(self, metadata):
|
|
return Table(
|
|
"boolean_data",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("data", Boolean),
|
|
)
|
|
|
|
def test_adapt_select(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
stmt = (
|
|
select(users)
|
|
.select_from(
|
|
users.join(
|
|
addresses, lambda: users.c.id == addresses.c.user_id
|
|
)
|
|
)
|
|
.where(lambda: users.c.name == "ed")
|
|
)
|
|
|
|
self.assert_compile(
|
|
stmt,
|
|
"SELECT users.id, users.name FROM users "
|
|
"JOIN addresses ON users.id = addresses.user_id "
|
|
"WHERE users.name = :name_1",
|
|
)
|
|
|
|
u1 = users.alias()
|
|
adapter = sql_util.ClauseAdapter(u1)
|
|
|
|
s2 = adapter.traverse(stmt)
|
|
|
|
self.assert_compile(
|
|
s2,
|
|
"SELECT users_1.id, users_1.name FROM users AS users_1 "
|
|
"JOIN addresses ON users_1.id = addresses.user_id "
|
|
"WHERE users_1.name = :name_1",
|
|
)
|
|
|
|
def test_no_var_dict_keys(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
names = {"x": "some name"}
|
|
foo = "x"
|
|
expr = lambda: users.c.name == names[foo] # noqa
|
|
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
"Dictionary keys / list indexes inside of a cached "
|
|
"lambda must be Python literals only",
|
|
coercions.expect,
|
|
roles.WhereHavingRole,
|
|
expr,
|
|
)
|
|
|
|
def test_reject_dict_literal_keys(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
names = {"x": "some name"}
|
|
lmb = lambda: users.c.name == names["x"] # noqa
|
|
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
"Closure variable named 'names' inside of lambda callable",
|
|
coercions.expect,
|
|
roles.WhereHavingRole,
|
|
lmb,
|
|
)
|
|
|
|
def test_dict_literal_keys_proper_use(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
names = {"x": "some name"}
|
|
x = names["x"]
|
|
lmb = lambda: users.c.name == x # noqa
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, lmb)
|
|
self.assert_compile(
|
|
expr,
|
|
"users.name = :x_1",
|
|
params=expr._param_dict(),
|
|
checkparams={"x_1": "some name"},
|
|
)
|
|
|
|
def test_assignment_one(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
x = 5
|
|
|
|
def my_lambda():
|
|
y = 10
|
|
z = y + 18
|
|
|
|
expr1 = users.c.name > x
|
|
expr2 = users.c.name < z
|
|
return and_(expr1, expr2)
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, my_lambda)
|
|
self.assert_compile(
|
|
expr,
|
|
"users.name > :x_1 AND users.name < :name_1",
|
|
params=expr._param_dict(),
|
|
checkparams={"name_1": 28, "x_1": 5},
|
|
)
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, my_lambda)
|
|
self.assert_compile(
|
|
expr,
|
|
"users.name > :x_1 AND users.name < :name_1",
|
|
params=expr._param_dict(),
|
|
checkparams={"name_1": 28, "x_1": 5},
|
|
)
|
|
|
|
def test_assignment_two(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
x = 5
|
|
z = 10
|
|
|
|
def my_lambda():
|
|
y = x + z
|
|
|
|
expr1 = users.c.name > x
|
|
expr2 = users.c.name < y
|
|
return and_(expr1, expr2)
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, my_lambda)
|
|
self.assert_compile(
|
|
expr,
|
|
"users.name > :x_1 AND users.name < :x_1 + :z_1",
|
|
params=expr._param_dict(),
|
|
checkparams={"x_1": 5, "z_1": 10},
|
|
)
|
|
|
|
x = 15
|
|
z = 18
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, my_lambda)
|
|
self.assert_compile(
|
|
expr,
|
|
"users.name > :x_1 AND users.name < :x_1 + :z_1",
|
|
params=expr._param_dict(),
|
|
checkparams={"x_1": 15, "z_1": 18},
|
|
)
|
|
|
|
def test_assignment_three(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
x = 5
|
|
z = 10
|
|
|
|
def my_lambda():
|
|
y = 10 + z
|
|
|
|
expr1 = users.c.name > x
|
|
expr2 = users.c.name < y
|
|
return and_(expr1, expr2)
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, my_lambda)
|
|
self.assert_compile(
|
|
expr,
|
|
"users.name > :x_1 AND users.name < :param_1 + :z_1",
|
|
params=expr._param_dict(),
|
|
checkparams={"x_1": 5, "z_1": 10, "param_1": 10},
|
|
)
|
|
|
|
x = 15
|
|
z = 18
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, my_lambda)
|
|
self.assert_compile(
|
|
expr,
|
|
"users.name > :x_1 AND users.name < :param_1 + :z_1",
|
|
params=expr._param_dict(),
|
|
checkparams={"x_1": 15, "z_1": 18, "param_1": 10},
|
|
)
|
|
|
|
def test_op_reverse(self, user_address_fixture):
|
|
user, addresses = user_address_fixture
|
|
|
|
x = "foo"
|
|
|
|
def mylambda():
|
|
return x + user.c.name
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, mylambda)
|
|
self.assert_compile(
|
|
expr, ":x_1 || users.name", checkparams={"x_1": "foo"}
|
|
)
|
|
|
|
x = "bar"
|
|
expr = coercions.expect(roles.WhereHavingRole, mylambda)
|
|
self.assert_compile(
|
|
expr, ":x_1 || users.name", checkparams={"x_1": "bar"}
|
|
)
|
|
|
|
def test_op_forwards(self, user_address_fixture):
|
|
user, addresses = user_address_fixture
|
|
|
|
x = "foo"
|
|
|
|
def mylambda():
|
|
return user.c.name + x
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, mylambda)
|
|
self.assert_compile(
|
|
expr, "users.name || :x_1", checkparams={"x_1": "foo"}
|
|
)
|
|
|
|
x = "bar"
|
|
expr = coercions.expect(roles.WhereHavingRole, mylambda)
|
|
self.assert_compile(
|
|
expr, "users.name || :x_1", checkparams={"x_1": "bar"}
|
|
)
|
|
|
|
def test_rhs_type_detection_from_left(self):
|
|
"""test #9029"""
|
|
tt = table("tt", column("q", JSON))
|
|
|
|
x = {"foo": "bar"}
|
|
|
|
def mylambda():
|
|
return tt.c.q._null_operate(x)
|
|
|
|
expr = coercions.expect(roles.WhereHavingRole, mylambda)
|
|
is_(expr._resolved.right.type._type_affinity, JSON)
|
|
|
|
def test_rhs_type_detection_standalone(self):
|
|
"""test related to #9029, as type coercion rule was changed"""
|
|
|
|
x = 5
|
|
|
|
def mylambda():
|
|
return x
|
|
|
|
expr = coercions.expect(roles.OrderByRole, mylambda)
|
|
is_(expr._resolved.type._type_affinity, Integer)
|
|
|
|
x = "now im a string"
|
|
|
|
# stays as int b.c. _resolved is cached
|
|
is_(expr._resolved.type._type_affinity, Integer)
|
|
|
|
# make a new one! now it will be string
|
|
expr = coercions.expect(roles.OrderByRole, mylambda)
|
|
is_(expr._resolved.type._type_affinity, String)
|
|
|
|
@testing.only_on("sqlite")
|
|
@testing.variation("stmt_type", ["lambda_stmt", "lambda_crit"])
|
|
@testing.variation("callable_type", ["none", "closure", "parameter"])
|
|
def test_9029_integration(
|
|
self, metadata, connection, stmt_type, callable_type
|
|
):
|
|
t = Table(
|
|
"t",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("data", JSON),
|
|
)
|
|
|
|
t.create(connection)
|
|
|
|
connection.execute(
|
|
t.insert(),
|
|
{
|
|
"id": 12,
|
|
"data": {"key": "value", "key2": {"subkey": [1, 2, 3]}},
|
|
},
|
|
)
|
|
|
|
d = {"key": "value", "key2": {"subkey": [1, 2, 3]}}
|
|
|
|
if callable_type.none:
|
|
if stmt_type.lambda_stmt:
|
|
stmt = lambda_stmt(lambda: select(t).filter(t.c.data == d))
|
|
elif stmt_type.lambda_crit:
|
|
stmt = select(t).filter(lambda: t.c.data == d)
|
|
else:
|
|
stmt_type.fail()
|
|
|
|
to_run = stmt
|
|
|
|
elif callable_type.closure:
|
|
|
|
def go():
|
|
if stmt_type.lambda_stmt:
|
|
stmt = lambda_stmt(lambda: select(t).filter(t.c.data == d))
|
|
elif stmt_type.lambda_crit:
|
|
stmt = select(t).filter(lambda: t.c.data == d)
|
|
else:
|
|
stmt_type.fail()
|
|
return stmt
|
|
|
|
to_run = go()
|
|
|
|
elif callable_type.parameter:
|
|
|
|
def go(data):
|
|
if stmt_type.lambda_stmt:
|
|
stmt = lambda_stmt(
|
|
lambda: select(t).filter(t.c.data == data)
|
|
)
|
|
elif stmt_type.lambda_crit:
|
|
stmt = select(t).filter(lambda: t.c.data == data)
|
|
else:
|
|
stmt_type.fail()
|
|
|
|
return stmt
|
|
|
|
to_run = go(d)
|
|
|
|
eq_(
|
|
connection.execute(to_run).first(),
|
|
(12, {"key": "value", "key2": {"subkey": [1, 2, 3]}}),
|
|
)
|
|
|
|
def test_execute_constructed_uncached(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
def go(name):
|
|
stmt = select(lambda: users.c.id).where(
|
|
lambda: users.c.name == name
|
|
)
|
|
with testing.db.connect().execution_options(
|
|
compiled_cache=None
|
|
) as conn:
|
|
conn.execute(stmt)
|
|
|
|
with self.sql_execution_asserter(testing.db) as asserter:
|
|
go("name1")
|
|
go("name2")
|
|
go("name1")
|
|
go("name3")
|
|
|
|
asserter.assert_(
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name1"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name2"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name1"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name3"}],
|
|
),
|
|
)
|
|
|
|
def test_execute_full_uncached(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
def go(name):
|
|
stmt = lambda_stmt(
|
|
lambda: select(users.c.id).where(users.c.name == name) # noqa
|
|
)
|
|
|
|
with testing.db.connect().execution_options(
|
|
compiled_cache=None
|
|
) as conn:
|
|
conn.execute(stmt)
|
|
|
|
with self.sql_execution_asserter(testing.db) as asserter:
|
|
go("name1")
|
|
go("name2")
|
|
go("name1")
|
|
go("name3")
|
|
|
|
asserter.assert_(
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name1"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name2"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name1"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name3"}],
|
|
),
|
|
)
|
|
|
|
def test_execute_constructed_cached(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
cache = {}
|
|
|
|
def go(name):
|
|
stmt = select(lambda: users.c.id).where(
|
|
lambda: users.c.name == name
|
|
)
|
|
|
|
with testing.db.connect().execution_options(
|
|
compiled_cache=cache
|
|
) as conn:
|
|
conn.execute(stmt)
|
|
|
|
with self.sql_execution_asserter(testing.db) as asserter:
|
|
go("name1")
|
|
go("name2")
|
|
go("name1")
|
|
go("name3")
|
|
|
|
asserter.assert_(
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name1"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name2"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name1"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name3"}],
|
|
),
|
|
)
|
|
|
|
def test_execute_full_cached(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
cache = {}
|
|
|
|
def go(name):
|
|
stmt = lambda_stmt(
|
|
lambda: select(users.c.id).where(users.c.name == name) # noqa
|
|
)
|
|
|
|
with testing.db.connect().execution_options(
|
|
compiled_cache=cache
|
|
) as conn:
|
|
conn.execute(stmt)
|
|
|
|
with self.sql_execution_asserter(testing.db) as asserter:
|
|
go("name1")
|
|
go("name2")
|
|
go("name1")
|
|
go("name3")
|
|
|
|
asserter.assert_(
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name1"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name2"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name1"}],
|
|
),
|
|
CompiledSQL(
|
|
"SELECT users.id FROM users WHERE users.name = :name_1",
|
|
lambda ctx: [{"name_1": "name3"}],
|
|
),
|
|
)
|
|
|
|
def test_cache_key_bindparam_matches(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
def go(x):
|
|
return coercions.expect(roles.WhereHavingRole, lambda: t1.c.q == x)
|
|
|
|
expr1 = go(5)
|
|
expr2 = go(10)
|
|
|
|
is_(expr1._generate_cache_key().bindparams[0], expr1._resolved.right)
|
|
is_(expr2._generate_cache_key().bindparams[0], expr2._resolved.right)
|
|
|
|
def test_cache_key_bindparam_matches_annotations(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
def go():
|
|
expr = sql_util._deep_annotate((t1.c.q == 5), {"foo": "bar"})
|
|
stmt = coercions.expect(roles.WhereHavingRole, lambda: expr)
|
|
return stmt
|
|
|
|
self.assert_compile(go(), "t1.q = :q_1", checkparams={"q_1": 5})
|
|
self.assert_compile(go(), "t1.q = :q_1", checkparams={"q_1": 5})
|
|
|
|
def test_cache_key_instance_variable_issue_incorrect(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
class Foo:
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def go(foo):
|
|
return coercions.expect(
|
|
roles.WhereHavingRole, lambda: t1.c.q == foo.value
|
|
)
|
|
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
"Closure variable named 'foo' inside of lambda callable",
|
|
go,
|
|
Foo(5),
|
|
)
|
|
|
|
def test_cache_key_instance_variable_issue_correct_one(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
class Foo:
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def go(foo):
|
|
value = foo.value
|
|
return coercions.expect(
|
|
roles.WhereHavingRole, lambda: t1.c.q == value
|
|
)
|
|
|
|
expr1 = go(Foo(5))
|
|
expr2 = go(Foo(10))
|
|
|
|
c1 = expr1._generate_cache_key()
|
|
c2 = expr2._generate_cache_key()
|
|
eq_(c1, c2)
|
|
|
|
def test_cache_key_instance_variable_issue_correct_two(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
class Foo:
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def go(foo):
|
|
return coercions.expect(
|
|
roles.WhereHavingRole,
|
|
lambda: t1.c.q == foo.value,
|
|
track_on=[self],
|
|
)
|
|
|
|
expr1 = go(Foo(5))
|
|
expr2 = go(Foo(10))
|
|
|
|
c1 = expr1._generate_cache_key()
|
|
c2 = expr2._generate_cache_key()
|
|
eq_(c1, c2)
|
|
|
|
def test_insert_statement(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
def ins(id_, name):
|
|
stmt = lambda_stmt(lambda: users.insert())
|
|
stmt += lambda s: s.values(id=id_, name=name)
|
|
return stmt
|
|
|
|
with testing.db.begin() as conn:
|
|
conn.execute(ins(12, "foo"))
|
|
|
|
eq_(
|
|
conn.execute(select(users).where(users.c.id == 12)).first(),
|
|
(12, "foo"),
|
|
)
|
|
|
|
def test_update_statement(self, user_address_fixture):
|
|
users, addresses = user_address_fixture
|
|
|
|
def upd(id_, newname):
|
|
stmt = lambda_stmt(lambda: users.update())
|
|
stmt += lambda s: s.values(name=newname)
|
|
stmt += lambda s: s.where(users.c.id == id_)
|
|
return stmt
|
|
|
|
with testing.db.begin() as conn:
|
|
conn.execute(users.insert().values(id=7, name="bar"))
|
|
conn.execute(upd(7, "foo"))
|
|
|
|
eq_(
|
|
conn.execute(select(users).where(users.c.id == 7)).first(),
|
|
(7, "foo"),
|
|
)
|
|
|
|
def test_bindparam_not_cached(self, user_address_fixture, testing_engine):
|
|
"""test #12084"""
|
|
|
|
users, addresses = user_address_fixture
|
|
|
|
engine = testing_engine(
|
|
share_pool=True, options={"query_cache_size": 0}
|
|
)
|
|
with engine.begin() as conn:
|
|
conn.execute(
|
|
users.insert(),
|
|
[{"id": 7, "name": "bar"}, {"id": 8, "name": "foo"}],
|
|
)
|
|
|
|
def make_query(stmt, *criteria):
|
|
for crit in criteria:
|
|
stmt += lambda s: s.where(crit)
|
|
|
|
return stmt
|
|
|
|
for i in range(2):
|
|
with engine.connect() as conn:
|
|
stmt = lambda_stmt(lambda: select(users))
|
|
# create a filter criterion that will never match anything
|
|
stmt1 = make_query(
|
|
stmt,
|
|
users.c.name == "bar",
|
|
users.c.name == "foo",
|
|
)
|
|
|
|
assert len(conn.scalars(stmt1).all()) == 0
|
|
|
|
stmt2 = make_query(
|
|
stmt,
|
|
users.c.name == "bar",
|
|
users.c.name == "bar",
|
|
users.c.name == "foo",
|
|
)
|
|
|
|
assert len(conn.scalars(stmt2).all()) == 0
|
|
|
|
|
|
class DeferredLambdaElementTest(
|
|
fixtures.TestBase, testing.AssertsExecutionResults, AssertsCompiledSQL
|
|
):
|
|
__dialect__ = "default"
|
|
|
|
@testing.fails("wontfix issue #5767")
|
|
def test_detect_change_in_binds_no_tracking(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
t2 = table("t2", column("q"), column("p"))
|
|
|
|
vv = [1, 2, 3]
|
|
# lambda produces either "t1 IN vv" or "NULL" based on the
|
|
# argument. will not produce a consistent cache key
|
|
elem = lambdas.DeferredLambdaElement(
|
|
lambda tab: tab.c.q.in_(vv) if tab.name == "t2" else null(),
|
|
roles.WhereHavingRole,
|
|
lambda_args=(t1,),
|
|
opts=lambdas.LambdaOptions(track_closure_variables=False),
|
|
)
|
|
|
|
self.assert_compile(elem.expr, "NULL")
|
|
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
r"Lambda callable at %s produced "
|
|
"a different set of bound parameters "
|
|
"than its original run: vv" % (elem.fn.__code__),
|
|
elem._resolve_with_args,
|
|
t2,
|
|
)
|
|
|
|
def test_detect_change_in_binds_tracking_positive(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
|
|
vv = [1, 2, 3]
|
|
|
|
# lambda produces either "t1 IN vv" or "NULL" based on the
|
|
# argument. will not produce a consistent cache key
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
"Closure variable named 'vv' inside of lambda callable",
|
|
lambdas.DeferredLambdaElement,
|
|
lambda tab: tab.c.q.in_(vv) if tab.name == "t2" else None,
|
|
roles.WhereHavingRole,
|
|
opts=lambdas.LambdaOptions,
|
|
lambda_args=(t1,),
|
|
)
|
|
|
|
@testing.fails("wontfix issue #5767")
|
|
def test_detect_change_in_binds_tracking_negative(self):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
t2 = table("t2", column("q"), column("p"))
|
|
|
|
vv = [1, 2, 3]
|
|
qq = [3, 4, 5]
|
|
|
|
# lambda produces either "t1 IN vv" or "t2 IN qq" based on the
|
|
# argument. will not produce a consistent cache key
|
|
elem = lambdas.DeferredLambdaElement(
|
|
lambda tab: (
|
|
tab.c.q.in_(vv) if tab.name == "t1" else tab.c.q.in_(qq)
|
|
),
|
|
roles.WhereHavingRole,
|
|
lambda_args=(t1,),
|
|
opts=lambdas.LambdaOptions(track_closure_variables=False),
|
|
)
|
|
|
|
self.assert_compile(elem.expr, "t1.q IN (__[POSTCOMPILE_vv_1])")
|
|
|
|
assert_raises_message(
|
|
exc.InvalidRequestError,
|
|
r"Lambda callable at %s produced "
|
|
"a different set of bound parameters "
|
|
"than its original run: qq" % (elem.fn.__code__),
|
|
elem._resolve_with_args,
|
|
t2,
|
|
)
|
|
|
|
def _fixture_one(self, t1):
|
|
vv = [1, 2, 3]
|
|
|
|
def go():
|
|
elem = lambdas.DeferredLambdaElement(
|
|
lambda tab: tab.c.q.in_(vv),
|
|
roles.WhereHavingRole,
|
|
lambda_args=(t1,),
|
|
opts=lambdas.LambdaOptions,
|
|
)
|
|
return elem
|
|
|
|
return go
|
|
|
|
def _fixture_two(self, t1):
|
|
def go():
|
|
elem = lambdas.DeferredLambdaElement(
|
|
lambda tab: tab.c.q == "x",
|
|
roles.WhereHavingRole,
|
|
lambda_args=(t1,),
|
|
opts=lambdas.LambdaOptions,
|
|
)
|
|
return elem
|
|
|
|
return go
|
|
|
|
def _fixture_three(self, t1):
|
|
def go():
|
|
elem = lambdas.DeferredLambdaElement(
|
|
lambda tab: tab.c.q != "x",
|
|
roles.WhereHavingRole,
|
|
lambda_args=(t1,),
|
|
opts=lambdas.LambdaOptions,
|
|
)
|
|
return elem
|
|
|
|
return go
|
|
|
|
def _fixture_four(self, t1):
|
|
def go():
|
|
elem = lambdas.DeferredLambdaElement(
|
|
lambda tab: tab.c.q.in_([1, 2, 3]),
|
|
roles.WhereHavingRole,
|
|
lambda_args=(t1,),
|
|
opts=lambdas.LambdaOptions,
|
|
)
|
|
return elem
|
|
|
|
return go
|
|
|
|
def _fixture_five(self, t1):
|
|
def go():
|
|
x = "x"
|
|
elem = lambdas.DeferredLambdaElement(
|
|
lambda tab: tab.c.q == x,
|
|
roles.WhereHavingRole,
|
|
lambda_args=(t1,),
|
|
opts=lambdas.LambdaOptions,
|
|
)
|
|
return elem
|
|
|
|
return go
|
|
|
|
def _fixture_six(self, t1):
|
|
def go():
|
|
x = "x"
|
|
elem = lambdas.DeferredLambdaElement(
|
|
lambda tab: tab.c.q != x,
|
|
roles.WhereHavingRole,
|
|
lambda_args=(t1,),
|
|
opts=lambdas.LambdaOptions,
|
|
)
|
|
return elem
|
|
|
|
return go
|
|
|
|
@testing.combinations(
|
|
("_fixture_one",),
|
|
("_fixture_two",),
|
|
("_fixture_three",),
|
|
("_fixture_four",),
|
|
("_fixture_five",),
|
|
("_fixture_six",),
|
|
)
|
|
def test_cache_key_many_different_args(self, fixture_name):
|
|
t1 = table("t1", column("q"), column("p"))
|
|
t2 = table("t2", column("q"), column("p"))
|
|
t3 = table("t3", column("q"), column("p"))
|
|
|
|
go = getattr(self, fixture_name)(t1)
|
|
|
|
g1 = go()
|
|
g2 = go()
|
|
|
|
g1key = g1._generate_cache_key()
|
|
g2key = g2._generate_cache_key()
|
|
eq_(g1key[0], g2key[0])
|
|
|
|
e1 = go()._resolve_with_args(t1)
|
|
e2 = go()._resolve_with_args(t2)
|
|
e3 = go()._resolve_with_args(t3)
|
|
|
|
e1key = e1._generate_cache_key()
|
|
e2key = e2._generate_cache_key()
|
|
e3key = e3._generate_cache_key()
|
|
|
|
e12 = go()._resolve_with_args(t1)
|
|
e32 = go()._resolve_with_args(t3)
|
|
|
|
e12key = e12._generate_cache_key()
|
|
e32key = e32._generate_cache_key()
|
|
|
|
ne_(e1key[0], e2key[0])
|
|
ne_(e2key[0], e3key[0])
|
|
|
|
eq_(e12key[0], e1key[0])
|
|
eq_(e32key[0], e3key[0])
|
|
|
|
|
|
class ConcurrencyTest(fixtures.TestBase):
|
|
"""test for #8098 and #9461"""
|
|
|
|
__requires__ = ("independent_readonly_connections",)
|
|
|
|
__only_on__ = ("+psycopg2", "+mysqldb", "+pysqlite", "+pymysql")
|
|
|
|
THREADS = 10
|
|
|
|
@testing.fixture
|
|
def mapping_fixture(self, decl_base):
|
|
class A(decl_base):
|
|
__tablename__ = "a"
|
|
id = Column(Integer, primary_key=True)
|
|
col1 = Column(String(100))
|
|
col2 = Column(String(100))
|
|
col3 = Column(String(100))
|
|
col4 = Column(String(100))
|
|
|
|
decl_base.metadata.create_all(testing.db)
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
with testing.db.connect() as conn:
|
|
with Session(conn) as session:
|
|
session.add_all(
|
|
[
|
|
A(col1=str(i), col2=str(i), col3=str(i), col4=str(i))
|
|
for i in range(self.THREADS + 1)
|
|
]
|
|
)
|
|
session.commit()
|
|
|
|
return A
|
|
|
|
@testing.requires.timing_intensive
|
|
def test_lambda_concurrency(self, testing_engine, mapping_fixture):
|
|
A = mapping_fixture
|
|
engine = testing_engine(options={"pool_size": self.THREADS + 5})
|
|
NUM_OF_LAMBDAS = 150
|
|
|
|
code = """
|
|
from sqlalchemy import lambda_stmt, select
|
|
|
|
|
|
def generate_lambda_stmt(wanted):
|
|
stmt = lambda_stmt(lambda: select(A.col1, A.col2, A.col3, A.col4))
|
|
"""
|
|
|
|
for _ in range(NUM_OF_LAMBDAS):
|
|
code += (
|
|
" stmt += lambda s: s.where((A.col1 == wanted) & "
|
|
"(A.col2 == wanted) & (A.col3 == wanted) & "
|
|
"(A.col4 == wanted))\n"
|
|
)
|
|
|
|
code += """
|
|
return stmt
|
|
"""
|
|
|
|
d = {"A": A, "__name__": "lambda_fake"}
|
|
exec(code, d)
|
|
generate_lambda_stmt = d["generate_lambda_stmt"]
|
|
|
|
runs: List[Optional[int]] = [None for _ in range(self.THREADS)]
|
|
conns = [engine.connect() for _ in range(self.THREADS)]
|
|
|
|
def run(num):
|
|
wanted = str(num)
|
|
connection = conns[num]
|
|
time.sleep(0.1)
|
|
stmt = generate_lambda_stmt(wanted)
|
|
time.sleep(0.1)
|
|
row = connection.execute(stmt).first()
|
|
if not row:
|
|
runs[num] = False
|
|
else:
|
|
runs[num] = True
|
|
|
|
threads = [
|
|
threading.Thread(target=run, args=(num,))
|
|
for num in range(self.THREADS)
|
|
]
|
|
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(timeout=10)
|
|
for conn in conns:
|
|
conn.close()
|
|
|
|
fails = len([r for r in runs if r is False])
|
|
assert not fails, f"{fails} runs failed"
|