Render LIMIT/OFFSET conditions after compile on select dialects

Added new "post compile parameters" feature.  This feature allows a
:func:`.bindparam` construct to have its value rendered into the SQL string
before being passed to the DBAPI driver, but after the compilation step,
using the "literal render" feature of the compiler.  The immediate
rationale for this feature is to support LIMIT/OFFSET schemes that don't
work or perform well as bound parameters handled by the database driver,
while still allowing for SQLAlchemy SQL constructs to be cacheable in their
compiled form.     The immediate targets for the new feature are the "TOP
N" clause used by SQL Server (and Sybase) which does not support a bound
parameter, as well as the "ROWNUM" and optional "FIRST_ROWS()" schemes used
by the Oracle dialect, the former of which has been known to perform better
without bound parameters and the latter of which does not support a bound
parameter.   The feature builds upon the mechanisms first developed to
support "expanding" parameters for IN expressions.   As part of this
feature, the Oracle ``use_binds_for_limits`` feature is turned on
unconditionally and this flag is now deprecated.

- adds limited support for "unique" bound parameters within
a text() construct.

- adds an additional int() check within the literal render
function of the Integer datatype and tests that non-int values
raise ValueError.

Fixes: #4808
Change-Id: Iace97d544d1a7351ee07db970c6bc06a19c712c6
This commit is contained in:
Mike Bayer
2019-08-18 10:02:24 -04:00
parent f499671ccc
commit 36e8fe48b2
27 changed files with 1434 additions and 727 deletions
+85
View File
@@ -616,6 +616,91 @@ expression on the outside to apply an "AS <name>" label directly::
:ticket:`4449`
.. _change_4808:
New "post compile" bound parameters used for LIMIT/OFFSET in Oracle, SQL Server
-------------------------------------------------------------------------------
A major goal of the 1.4 series is to establish that all Core SQL constructs
are completely cacheable, meaning that a particular :class:`.Compiled`
structure will produce an identical SQL string regardless of any SQL parameters
used with it, which notably includes those used to specify the LIMIT and
OFFSET values, typically used for pagination and "top N" style results.
While SQLAlchemy has used bound parameters for LIMIT/OFFSET schemes for many
years, a few outliers remained where such parameters were not allowed, including
a SQL Server "TOP N" statement, such as::
SELECT TOP 5 mytable.id, mytable.data FROM mytable
as well as with Oracle, where the FIRST_ROWS() hint (which SQLAlchemy will
use if the ``optimize_limits=True`` parameter is passed to
:func:`.create_engine` with an Oracle URL) does not allow them,
but also that using bound parameters with ROWNUM comparisons has been reported
as producing slower query plans::
SELECT anon_1.id, anon_1.data FROM (
SELECT /*+ FIRST_ROWS(5) */
anon_2.id AS id,
anon_2.data AS data,
ROWNUM AS ora_rn FROM (
SELECT mytable.id, mytable.data FROM mytable
) anon_2
WHERE ROWNUM <= :param_1
) anon_1 WHERE ora_rn > :param_2
In order to allow for all statements to be unconditionally cacheable at the
compilation level, a new form of bound parameter called a "post compile"
parameter has been added, which makes use of the same mechanism as that
of "expanding IN parameters". This is a :func:`.bindparam` that behaves
identically to any other bound parameter except that parameter value will
be rendered literally into the SQL string before sending it to the DBAPI
``cursor.execute()`` method. The new parameter is used internally by the
SQL Server and Oracle dialects, so that the drivers receive the literal
rendered value but the rest of SQLAlchemy can still consider this as a
bound parameter. The above two statements when stringified using
``str(statement.compile(dialect=<dialect>))`` now look like::
SELECT TOP [POSTCOMPILE_param_1] mytable.id, mytable.data FROM mytable
and::
SELECT anon_1.id, anon_1.data FROM (
SELECT /*+ FIRST_ROWS([POSTCOMPILE__ora_frow_1]) */
anon_2.id AS id,
anon_2.data AS data,
ROWNUM AS ora_rn FROM (
SELECT mytable.id, mytable.data FROM mytable
) anon_2
WHERE ROWNUM <= [POSTCOMPILE_param_1]
) anon_1 WHERE ora_rn > [POSTCOMPILE_param_2]
The ``[POSTCOMPILE_<param>]`` format is also what is seen when an
"expanding IN" is used.
When viewing the SQL logging output, the final form of the statement will
be seen::
SELECT anon_1.id, anon_1.data FROM (
SELECT /*+ FIRST_ROWS(5) */
anon_2.id AS id,
anon_2.data AS data,
ROWNUM AS ora_rn FROM (
SELECT mytable.id AS id, mytable.data AS data FROM mytable
) anon_2
WHERE ROWNUM <= 8
) anon_1 WHERE ora_rn > 3
The "post compile parameter" feature is exposed as public API through the
:paramref:`.bindparam.literal_execute` parameter, however is currently not
intended for general use. The literal values are rendered using the
:meth:`.TypeEngine.literal_processor` of the underlying datatype, which in
SQLAlchemy has **extremely limited** scope, supporting only integers and simple
string values.
:ticket:`4808`
.. _change_4712:
Connection-level transactions can now be inactive based on subtransaction
+24
View File
@@ -0,0 +1,24 @@
.. change::
:tags: feature, sql, mssql, oracle
:tickets: 4808
Added new "post compile parameters" feature. This feature allows a
:func:`.bindparam` construct to have its value rendered into the SQL string
before being passed to the DBAPI driver, but after the compilation step,
using the "literal render" feature of the compiler. The immediate
rationale for this feature is to support LIMIT/OFFSET schemes that don't
work or perform well as bound parameters handled by the database driver,
while still allowing for SQLAlchemy SQL constructs to be cacheable in their
compiled form. The immediate targets for the new feature are the "TOP
N" clause used by SQL Server (and Sybase) which does not support a bound
parameter, as well as the "ROWNUM" and optional "FIRST_ROWS()" schemes used
by the Oracle dialect, the former of which has been known to perform better
without bound parameters and the latter of which does not support a bound
parameter. The feature builds upon the mechanisms first developed to
support "expanding" parameters for IN expressions. As part of this
feature, the Oracle ``use_binds_for_limits`` feature is turned on
unconditionally and this flag is now deprecated.
.. seealso::
:ref:`change_4808`
+21 -9
View File
@@ -246,17 +246,29 @@ LIMIT/OFFSET Support
--------------------
MSSQL has no support for the LIMIT or OFFSET keywords. LIMIT is
supported directly through the ``TOP`` Transact SQL keyword::
supported directly through the ``TOP`` Transact SQL keyword. A statement
such as::
select.limit
select([some_table]).limit(5)
will yield::
will render similarly to::
SELECT TOP n
SELECT TOP 5 col1, col2.. FROM table
If using SQL Server 2005 or above, LIMIT with OFFSET
support is available through the ``ROW_NUMBER OVER`` construct.
For versions below 2005, LIMIT with OFFSET usage will fail.
LIMIT with OFFSET support is implemented using the using the ``ROW_NUMBER()``
window function. A statement such as::
select([some_table]).order_by(some_table.c.col3).limit(5).offset(10)
will render similarly to::
SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2,
ROW_NUMBER() OVER (ORDER BY col3) AS
mssql_rn FROM table WHERE t.x = :x_1) AS
anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1
Note that when using LIMIT and OFFSET together, the statement must have
an ORDER BY as well.
.. _mssql_isolation_level:
@@ -1603,8 +1615,8 @@ class MSSQLCompiler(compiler.SQLCompiler):
# ODBC drivers and possibly others
# don't support bind params in the SELECT clause on SQL Server.
# so have to use literal here.
s += "TOP %d " % select._limit
kw["literal_execute"] = True
s += "TOP %s " % self.process(select._limit_clause, **kw)
if s:
return s
else:
+66 -41
View File
@@ -25,7 +25,7 @@ which affect the behavior of the dialect regardless of driver in use.
* ``optimize_limits`` - defaults to ``False``. see the section on
LIMIT/OFFSET.
* ``use_binds_for_limits`` - defaults to ``True``. see the section on
* ``use_binds_for_limits`` - deprecated. see the section on
LIMIT/OFFSET.
Auto Increment Behavior
@@ -71,30 +71,33 @@ lowercase names should be used on the SQLAlchemy side.
LIMIT/OFFSET Support
--------------------
Oracle has no support for the LIMIT or OFFSET keywords. SQLAlchemy uses
a wrapped subquery approach in conjunction with ROWNUM. The exact methodology
is taken from
http://www.oracle.com/technetwork/issue-archive/2006/06-sep/o56asktom-086197.html .
Oracle has no direct support for LIMIT and OFFSET until version 12c.
To achieve this behavior across all widely used versions of Oracle starting
with the 8 series, SQLAlchemy currently makes use of ROWNUM to achieve
LIMIT/OFFSET; the exact methodology is taken from
https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
There are two options which affect its behavior:
There is currently a single option to affect its behavior:
* the "FIRST ROWS()" optimization keyword is not used by default. To enable
* the "FIRST_ROWS()" optimization keyword is not used by default. To enable
the usage of this optimization directive, specify ``optimize_limits=True``
to :func:`.create_engine`.
* the values passed for the limit/offset are sent as bound parameters. Some
users have observed that Oracle produces a poor query plan when the values
are sent as binds and not rendered literally. To render the limit/offset
values literally within the SQL statement, specify
``use_binds_for_limits=False`` to :func:`.create_engine`.
Some users have reported better performance when the entirely different
approach of a window query is used, i.e. ROW_NUMBER() OVER (ORDER BY), to
provide LIMIT/OFFSET (note that the majority of users don't observe this).
To suit this case the method used for LIMIT/OFFSET can be replaced entirely.
See the recipe at
http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowFunctionsByDefault
which installs a select compiler that overrides the generation of limit/offset
with a window function.
.. versionchanged:: 1.4
The Oracle dialect renders limit/offset integer values using a "post
compile" scheme which renders the integer directly before passing the
statement to the cursor for execution. The ``use_binds_for_limits`` flag
no longer has an effect.
.. seealso::
:ref:`change_4808`.
Support for changing the row number strategy, which would include one that
makes use of the ``row_number()`` window function as well as one that makes
use of the Oracle 12c "FETCH FIRST N ROW / OFFSET N ROWS" keywords may be
added in a future release.
.. _oracle_returning:
@@ -364,6 +367,7 @@ from ...types import CHAR
from ...types import CLOB
from ...types import FLOAT
from ...types import INTEGER
from ...types import Integer
from ...types import NCHAR
from ...types import NVARCHAR
from ...types import TIMESTAMP
@@ -855,17 +859,9 @@ class OracleCompiler(compiler.SQLCompiler):
limit_clause = select._limit_clause
offset_clause = select._offset_clause
if limit_clause is not None or offset_clause is not None:
# See http://www.oracle.com/technology/oramag/oracle/06-sep/\
# o56asktom.html
#
# Generalized form of an Oracle pagination query:
# select ... from (
# select /*+ FIRST_ROWS(N) */ ...., rownum as ora_rn from
# ( select distinct ... where ... order by ...
# ) where ROWNUM <= :limit+:offset
# ) where ora_rn > :offset
# Outer select and "ROWNUM as ora_rn" can be dropped if
# limit=0
# currently using form at:
# https://blogs.oracle.com/oraclemagazine/\
# on-rownum-and-limiting-results
kwargs["select_wraps_for"] = orig_select = select
select = select._generate()
@@ -896,8 +892,17 @@ class OracleCompiler(compiler.SQLCompiler):
and self.dialect.optimize_limits
and select._simple_int_limit
):
param = sql.bindparam(
"_ora_frow",
select._limit,
type_=Integer,
literal_execute=True,
unique=True,
)
limitselect = limitselect.prefix_with(
"/*+ FIRST_ROWS(%d) */" % select._limit
expression.text(
"/*+ FIRST_ROWS(:_ora_frow) */"
).bindparams(param)
)
limitselect._oracle_visit = True
@@ -913,14 +918,20 @@ class OracleCompiler(compiler.SQLCompiler):
# If needed, add the limiting clause
if limit_clause is not None:
if not self.dialect.use_binds_for_limits:
# use simple int limits, will raise an exception
# if the limit isn't specified this way
if select._simple_int_limit and (
offset_clause is None or select._simple_int_offset
):
max_row = select._limit
if offset_clause is not None:
max_row += select._offset
max_row = sql.literal_column("%d" % max_row)
max_row = sql.bindparam(
None,
max_row,
type_=Integer,
literal_execute=True,
unique=True,
)
else:
max_row = limit_clause
if offset_clause is not None:
@@ -969,10 +980,15 @@ class OracleCompiler(compiler.SQLCompiler):
adapter.traverse(elem) for elem in for_update.of
]
if not self.dialect.use_binds_for_limits:
offset_clause = sql.literal_column(
"%d" % select._offset
if select._simple_int_offset:
offset_clause = sql.bindparam(
None,
select._offset,
Integer,
literal_execute=True,
unique=True,
)
offsetselect = offsetselect.where(
sql.literal_column("ora_rn") > offset_clause
)
@@ -1150,11 +1166,21 @@ class OracleDialect(default.DefaultDialect):
(sa_schema.Index, {"bitmap": False, "compress": False}),
]
@util.deprecated_params(
use_binds_for_limits=(
"1.4",
"The ``use_binds_for_limits`` Oracle dialect parameter is "
"deprecated. The dialect now renders LIMIT /OFFSET integers "
"inline in all cases using a post-compilation hook, so that the "
"value is still represented by a 'bound parameter' on the Core "
"Expression side.",
)
)
def __init__(
self,
use_ansi=True,
optimize_limits=False,
use_binds_for_limits=True,
use_binds_for_limits=None,
use_nchar_for_unicode=False,
exclude_tablespaces=("SYSTEM", "SYSAUX"),
**kwargs
@@ -1163,7 +1189,6 @@ class OracleDialect(default.DefaultDialect):
self._use_nchar_for_unicode = use_nchar_for_unicode
self.use_ansi = use_ansi
self.optimize_limits = optimize_limits
self.use_binds_for_limits = use_binds_for_limits
self.exclude_tablespaces = exclude_tablespaces
def initialize(self, connection):
+6 -6
View File
@@ -518,13 +518,13 @@ class OracleCompiler_cx_oracle(OracleCompiler):
quote is True
or quote is not False
and self.preparer._bindparam_requires_quotes(name)
and not kw.get("post_compile", False)
):
if kw.get("expanding", False):
raise exc.CompileError(
"Can't use expanding feature with parameter name "
"%r on Oracle; it requires quoting which is not supported "
"in this context." % name
)
# interesting to note about expanding parameters - since the
# new parameters take the form <paramname>_<int>, at least if
# they are originally formed from reserved words, they no longer
# need quoting :). names that include illegal characters
# won't work however.
quoted_name = '"%s"' % name
self._quoted_bind_names[name] = quoted_name
return OracleCompiler.bindparam_string(self, quoted_name, **kw)
+6 -11
View File
@@ -513,17 +513,12 @@ class SybaseSQLCompiler(compiler.SQLCompiler):
def get_select_precolumns(self, select, **kw):
s = select._distinct and "DISTINCT " or ""
# TODO: don't think Sybase supports
# bind params for FIRST / TOP
limit = select._limit
if limit:
# if select._limit == 1:
# s += "FIRST "
# else:
# s += "TOP %s " % (select._limit,)
s += "TOP %s " % (limit,)
offset = select._offset
if offset:
if select._simple_int_limit and not select._offset:
kw["literal_execute"] = True
s += "TOP %s " % self.process(select._limit_clause, **kw)
if select._offset:
raise NotImplementedError("Sybase ASE does not support OFFSET")
return s
+58 -65
View File
@@ -695,8 +695,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
processors = compiled._bind_processors
if compiled.contains_expanding_parameters:
positiontup = self._expand_in_parameters(compiled, processors)
if compiled.literal_execute_params:
positiontup = self._literal_execute_parameters(
compiled, processors
)
elif compiled.positional:
positiontup = self.compiled.positiontup
@@ -744,21 +746,34 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
return self
def _expand_in_parameters(self, compiled, processors):
"""handle special 'expanding' parameters, IN tuples that are rendered
on a per-parameter basis for an otherwise fixed SQL statement string.
def _literal_execute_parameters(self, compiled, processors):
"""handle special post compile parameters.
These include:
* "expanding" parameters -typically IN tuples that are rendered
on a per-parameter basis for an otherwise fixed SQL statement string.
* literal_binds compiled with the literal_execute flag. Used for
things like SQL Server "TOP N" where the driver does not accommodate
N as a bound parameter.
"""
if self.executemany:
raise exc.InvalidRequestError(
"'expanding' parameters can't be used with " "executemany()"
"'literal_execute' or 'expanding' parameters can't be "
"used with executemany()"
)
if self.compiled.positional and self.compiled._numeric_binds:
# I'm not familiar with any DBAPI that uses 'numeric'
if compiled.positional and compiled._numeric_binds:
# I'm not familiar with any DBAPI that uses 'numeric'.
# strategy would likely be to make use of numbers greater than
# the highest number present; then for expanding parameters,
# append them to the end of the parameter list. that way
# we avoid having to renumber all the existing parameters.
raise NotImplementedError(
"'expanding' bind parameters not supported with "
"'numeric' paramstyle at this time."
"'post-compile' bind parameters are not supported with "
"the 'numeric' paramstyle at this time."
)
self._expanded_parameters = {}
@@ -773,12 +788,21 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
to_update_sets = {}
for name in (
self.compiled.positiontup
compiled.positiontup
if compiled.positional
else self.compiled.binds
else compiled.bind_names.values()
):
parameter = self.compiled.binds[name]
if parameter.expanding:
parameter = compiled.binds[name]
if parameter in compiled.literal_execute_params:
if not parameter.expanding:
value = compiled_params.pop(name)
replacement_expressions[
name
] = compiled.render_literal_bindparam(
parameter, render_literal_value=value
)
continue
if name in replacement_expressions:
to_update = to_update_sets[name]
@@ -791,58 +815,25 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
# param.
values = compiled_params.pop(name)
if not values:
to_update = to_update_sets[name] = []
replacement_expressions[
name
] = self.compiled.visit_empty_set_expr(
parameter._expanding_in_types
if parameter._expanding_in_types
else [parameter.type]
)
leep = compiled._literal_execute_expanding_parameter
to_update, replacement_expr = leep(name, parameter, values)
elif isinstance(values[0], (tuple, list)):
to_update = to_update_sets[name] = [
("%s_%s_%s" % (name, i, j), value)
for i, tuple_element in enumerate(values, 1)
for j, value in enumerate(tuple_element, 1)
]
replacement_expressions[name] = (
"VALUES " if self.dialect.tuple_in_values else ""
) + ", ".join(
"(%s)"
% ", ".join(
self.compiled.bindtemplate
% {
"name": to_update[
i * len(tuple_element) + j
][0]
}
for j, value in enumerate(tuple_element)
)
for i, tuple_element in enumerate(values)
)
else:
to_update = to_update_sets[name] = [
("%s_%s" % (name, i), value)
for i, value in enumerate(values, 1)
]
replacement_expressions[name] = ", ".join(
self.compiled.bindtemplate % {"name": key}
for key, value in to_update
)
to_update_sets[name] = to_update
replacement_expressions[name] = replacement_expr
compiled_params.update(to_update)
processors.update(
(key, processors[name])
for key, value in to_update
if name in processors
)
if compiled.positional:
positiontup.extend(name for name, value in to_update)
self._expanded_parameters[name] = [
expand_key for expand_key, value in to_update
]
if not parameter.literal_execute:
compiled_params.update(to_update)
processors.update(
(key, processors[name])
for key, value in to_update
if name in processors
)
if compiled.positional:
positiontup.extend(name for name, value in to_update)
self._expanded_parameters[name] = [
expand_key for expand_key, value in to_update
]
elif compiled.positional:
positiontup.append(name)
@@ -850,7 +841,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
return replacement_expressions[m.group(1)]
self.statement = re.sub(
r"\[EXPANDING_(\S+)\]", process_expanding, self.statement
r"\[POSTCOMPILE_(\S+)\]", process_expanding, self.statement
)
return positiontup
@@ -1214,6 +1205,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
inputsizes = {}
for bindparam in self.compiled.bind_names:
if bindparam in self.compiled.literal_execute_params:
continue
dialect_impl = bindparam.type._unwrapped_dialect_impl(self.dialect)
dialect_impl_cls = type(dialect_impl)
+4 -3
View File
@@ -294,11 +294,11 @@ class BinaryElementImpl(
def _post_coercion(self, resolved, expr, **kw):
if (
isinstance(resolved, elements.BindParameter)
isinstance(resolved, (elements.Grouping, elements.BindParameter))
and resolved.type._isnull
and not expr.type._isnull
):
resolved = resolved._clone()
resolved.type = expr.type
resolved = resolved._with_binary_element_type(expr.type)
return resolved
@@ -360,6 +360,7 @@ class InElementImpl(RoleImpl, roles.InElementRole):
element = element._with_expanding_in_types(
[elem.type for elem in expr]
)
return element
else:
return element
+135 -25
View File
@@ -36,6 +36,7 @@ from . import roles
from . import schema
from . import selectable
from . import sqltypes
from .base import NO_ARG
from .. import exc
from .. import util
@@ -463,14 +464,6 @@ class SQLCompiler(Compiled):
columns with the table name (i.e. MySQL only)
"""
contains_expanding_parameters = False
"""True if we've encountered bindparam(..., expanding=True).
These need to be converted before execution time against the
string statement.
"""
ansi_bind_rules = False
"""SQL 92 doesn't allow bind parameters to be used
in the columns clause of a SELECT, nor does it allow
@@ -507,6 +500,8 @@ class SQLCompiler(Compiled):
"""
literal_execute_params = frozenset()
insert_prefetch = update_prefetch = ()
def __init__(
@@ -1267,6 +1262,81 @@ class SQLCompiler(Compiled):
% self.dialect.name
)
def _literal_execute_expanding_parameter_literal_binds(
self, parameter, values
):
if not values:
replacement_expression = self.visit_empty_set_expr(
parameter._expanding_in_types
if parameter._expanding_in_types
else [parameter.type]
)
elif isinstance(values[0], (tuple, list)):
replacement_expression = (
"VALUES " if self.dialect.tuple_in_values else ""
) + ", ".join(
"(%s)"
% (
", ".join(
self.render_literal_value(value, parameter.type)
for value in tuple_element
)
)
for i, tuple_element in enumerate(values)
)
else:
replacement_expression = ", ".join(
self.render_literal_value(value, parameter.type)
for value in values
)
return (), replacement_expression
def _literal_execute_expanding_parameter(self, name, parameter, values):
if parameter.literal_execute:
return self._literal_execute_expanding_parameter_literal_binds(
parameter, values
)
if not values:
to_update = []
replacement_expression = self.visit_empty_set_expr(
parameter._expanding_in_types
if parameter._expanding_in_types
else [parameter.type]
)
elif isinstance(values[0], (tuple, list)):
to_update = [
("%s_%s_%s" % (name, i, j), value)
for i, tuple_element in enumerate(values, 1)
for j, value in enumerate(tuple_element, 1)
]
replacement_expression = (
"VALUES " if self.dialect.tuple_in_values else ""
) + ", ".join(
"(%s)"
% (
", ".join(
self.bindtemplate
% {"name": to_update[i * len(tuple_element) + j][0]}
for j, value in enumerate(tuple_element)
)
)
for i, tuple_element in enumerate(values)
)
else:
to_update = [
("%s_%s" % (name, i), value)
for i, value in enumerate(values, 1)
]
replacement_expression = ", ".join(
self.bindtemplate % {"name": key} for key, value in to_update
)
return to_update, replacement_expression
def visit_binary(
self, binary, override_operator=None, eager_grouping=False, **kw
):
@@ -1457,6 +1527,7 @@ class SQLCompiler(Compiled):
within_columns_clause=False,
literal_binds=False,
skip_bind_expression=False,
literal_execute=False,
**kwargs
):
@@ -1469,18 +1540,28 @@ class SQLCompiler(Compiled):
skip_bind_expression=True,
within_columns_clause=within_columns_clause,
literal_binds=literal_binds,
literal_execute=literal_execute,
**kwargs
)
if literal_binds or (within_columns_clause and self.ansi_bind_rules):
if bindparam.value is None and bindparam.callable is None:
raise exc.CompileError(
"Bind parameter '%s' without a "
"renderable value not allowed here." % bindparam.key
)
return self.render_literal_bindparam(
if not literal_binds:
post_compile = (
literal_execute
or bindparam.literal_execute
or bindparam.expanding
)
else:
post_compile = False
if not literal_execute and (
literal_binds or (within_columns_clause and self.ansi_bind_rules)
):
ret = self.render_literal_bindparam(
bindparam, within_columns_clause=True, **kwargs
)
if bindparam.expanding:
ret = "(%s)" % ret
return ret
name = self._truncate_bindparam(bindparam)
@@ -1508,13 +1589,38 @@ class SQLCompiler(Compiled):
self.binds[bindparam.key] = self.binds[name] = bindparam
return self.bindparam_string(
name, expanding=bindparam.expanding, **kwargs
)
if post_compile:
self.literal_execute_params |= {bindparam}
def render_literal_bindparam(self, bindparam, **kw):
value = bindparam.effective_value
return self.render_literal_value(value, bindparam.type)
ret = self.bindparam_string(
name,
post_compile=post_compile,
expanding=bindparam.expanding,
**kwargs
)
if bindparam.expanding:
ret = "(%s)" % ret
return ret
def render_literal_bindparam(
self, bindparam, render_literal_value=NO_ARG, **kw
):
if render_literal_value is not NO_ARG:
value = render_literal_value
else:
if bindparam.value is None and bindparam.callable is None:
raise exc.CompileError(
"Bind parameter '%s' without a "
"renderable value not allowed here." % bindparam.key
)
value = bindparam.effective_value
if bindparam.expanding:
leep = self._literal_execute_expanding_parameter_literal_binds
to_update, replacement_expr = leep(bindparam, value)
return replacement_expr
else:
return self.render_literal_value(value, bindparam.type)
def render_literal_value(self, value, type_):
"""Render the value of a bind parameter as a quoted literal.
@@ -1577,16 +1683,20 @@ class SQLCompiler(Compiled):
return derived + "_" + str(anonymous_counter)
def bindparam_string(
self, name, positional_names=None, expanding=False, **kw
self,
name,
positional_names=None,
post_compile=False,
expanding=False,
**kw
):
if self.positional:
if positional_names is not None:
positional_names.append(name)
else:
self.positiontup.append(name)
if expanding:
self.contains_expanding_parameters = True
return "([EXPANDING_%s])" % name
if post_compile:
return "[POSTCOMPILE_%s]" % name
else:
return self.bindtemplate % {"name": name}
+49 -3
View File
@@ -211,6 +211,15 @@ class ClauseElement(roles.SQLRole, Visitable):
return c
def _with_binary_element_type(self, type_):
"""in the context of binary expression, convert the type of this
object to the one given.
applies only to :class:`.ColumnElement` classes.
"""
return self
def _cache_key(self, **kw):
"""return an optional cache key.
@@ -732,6 +741,14 @@ class ColumnElement(
def type(self):
return type_api.NULLTYPE
def _with_binary_element_type(self, type_):
cloned = self._clone()
cloned._copy_internals(
clone=lambda element: element._with_binary_element_type(type_)
)
cloned.type = type_
return cloned
@util.memoized_property
def comparator(self):
try:
@@ -986,6 +1003,7 @@ class BindParameter(roles.InElementRole, ColumnElement):
callable_=None,
expanding=False,
isoutparam=False,
literal_execute=False,
_compared_to_operator=None,
_compared_to_type=None,
):
@@ -1198,6 +1216,30 @@ class BindParameter(roles.InElementRole, ColumnElement):
:func:`.outparam`
:param literal_execute:
if True, the bound parameter will be rendered in the compile phase
with a special "POSTCOMPILE" token, and the SQLAlchemy compiler will
render the final value of the parameter into the SQL statement at
statement execution time, omitting the value from the parameter
dictionary / list passed to DBAPI ``cursor.execute()``. This
produces a similar effect as that of using the ``literal_binds``,
compilation flag, however takes place as the statement is sent to
the DBAPI ``cursor.execute()`` method, rather than when the statement
is compiled. The primary use of this
capability is for rendering LIMIT / OFFSET clauses for database
drivers that can't accommodate for bound parameters in these
contexts, while allowing SQL constructs to be cacheable at the
compilation level.
.. versionadded:: 1.4 Added "post compile" bound parameters
.. seealso::
:ref:`change_4808`.
"""
if isinstance(key, ColumnClause):
type_ = key.type
@@ -1235,6 +1277,7 @@ class BindParameter(roles.InElementRole, ColumnElement):
self.isoutparam = isoutparam
self.required = required
self.expanding = expanding
self.literal_execute = literal_execute
if type_ is None:
if _compared_to_type is not None:
@@ -1643,14 +1686,17 @@ class TextClause(
for bind in binds:
try:
existing = new_params[bind.key]
# the regex used for text() currently will not match
# a unique/anonymous key in any case, so use the _orig_key
# so that a text() construct can support unique parameters
existing = new_params[bind._orig_key]
except KeyError:
raise exc.ArgumentError(
"This text() construct doesn't define a "
"bound parameter named %r" % bind.key
"bound parameter named %r" % bind._orig_key
)
else:
new_params[existing.key] = bind
new_params[existing._orig_key] = bind
for key, value in names_to_values.items():
try:
+1 -1
View File
@@ -471,7 +471,7 @@ class Integer(_LookupExpressionAdapter, TypeEngine):
def literal_processor(self, dialect):
def process(value):
return str(value)
return str(int(value))
return process
+2 -3
View File
@@ -38,11 +38,10 @@ class SQLMatchRule(AssertRule):
class CursorSQL(SQLMatchRule):
consume_statement = False
def __init__(self, statement, params=None):
def __init__(self, statement, params=None, consume_statement=True):
self.statement = statement
self.params = params
self.consume_statement = consume_statement
def process_statement(self, execute_observed):
stmt = execute_observed.statements[0]
+20
View File
@@ -44,6 +44,26 @@ class SuiteRequirements(Requirements):
return exclusions.open()
@property
def standard_cursor_sql(self):
"""Target database passes SQL-92 style statements to cursor.execute()
when a statement like select() or insert() is run.
A very small portion of dialect-level tests will ensure that certain
conditions are present in SQL strings, and these tests use very basic
SQL that will work on any SQL-like platform in order to assert results.
It's normally a given for any pep-249 DBAPI that a statement like
"SELECT id, name FROM table WHERE some_table.id=5" will work.
However, there are dialects that don't actually produce SQL Strings
and instead may work with symbolic objects instead, or dialects that
aren't working with SQL, so for those this requirement can be marked
as excluded.
"""
return exclusions.open()
@property
def on_update_cascade(self):
""""target database must support ON UPDATE..CASCADE behavior in
+190
View File
@@ -1,7 +1,10 @@
from .. import AssertsCompiledSQL
from .. import AssertsExecutionResults
from .. import config
from .. import fixtures
from ..assertions import eq_
from ..assertions import in_
from ..assertsql import CursorSQL
from ..schema import Column
from ..schema import Table
from ... import bindparam
@@ -14,6 +17,7 @@ from ... import null
from ... import select
from ... import String
from ... import testing
from ... import text
from ... import true
from ... import tuple_
from ... import union
@@ -233,6 +237,59 @@ class LimitOffsetTest(fixtures.TablesTest):
params={"l": 2, "o": 1},
)
@testing.requires.sql_expression_limit_offset
def test_expr_offset(self):
table = self.tables.some_table
self._assert_result(
select([table])
.order_by(table.c.id)
.offset(literal_column("1") + literal_column("2")),
[(4, 4, 5)],
)
@testing.requires.sql_expression_limit_offset
def test_expr_limit(self):
table = self.tables.some_table
self._assert_result(
select([table])
.order_by(table.c.id)
.limit(literal_column("1") + literal_column("2")),
[(1, 1, 2), (2, 2, 3), (3, 3, 4)],
)
@testing.requires.sql_expression_limit_offset
def test_expr_limit_offset(self):
table = self.tables.some_table
self._assert_result(
select([table])
.order_by(table.c.id)
.limit(literal_column("1") + literal_column("1"))
.offset(literal_column("1") + literal_column("1")),
[(3, 3, 4), (4, 4, 5)],
)
@testing.requires.sql_expression_limit_offset
def test_simple_limit_expr_offset(self):
table = self.tables.some_table
self._assert_result(
select([table])
.order_by(table.c.id)
.limit(2)
.offset(literal_column("1") + literal_column("1")),
[(3, 3, 4), (4, 4, 5)],
)
@testing.requires.sql_expression_limit_offset
def test_expr_limit_simple_offset(self):
table = self.tables.some_table
self._assert_result(
select([table])
.order_by(table.c.id)
.limit(literal_column("1") + literal_column("1"))
.offset(2),
[(3, 3, 4), (4, 4, 5)],
)
class CompoundSelectTest(fixtures.TablesTest):
__backend__ = True
@@ -372,6 +429,127 @@ class CompoundSelectTest(fixtures.TablesTest):
)
class PostCompileParamsTest(
AssertsExecutionResults, AssertsCompiledSQL, fixtures.TablesTest
):
__backend__ = True
__requires__ = ("standard_cursor_sql",)
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
Column("z", String(50)),
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2, "z": "z1"},
{"id": 2, "x": 2, "y": 3, "z": "z2"},
{"id": 3, "x": 3, "y": 4, "z": "z3"},
{"id": 4, "x": 4, "y": 5, "z": "z4"},
],
)
def test_compile(self):
table = self.tables.some_table
stmt = select([table.c.id]).where(
table.c.x == bindparam("q", literal_execute=True)
)
self.assert_compile(
stmt,
"SELECT some_table.id FROM some_table "
"WHERE some_table.x = [POSTCOMPILE_q]",
{},
)
def test_compile_literal_binds(self):
table = self.tables.some_table
stmt = select([table.c.id]).where(
table.c.x == bindparam("q", 10, literal_execute=True)
)
self.assert_compile(
stmt,
"SELECT some_table.id FROM some_table WHERE some_table.x = 10",
{},
literal_binds=True,
)
def test_execute(self):
table = self.tables.some_table
stmt = select([table.c.id]).where(
table.c.x == bindparam("q", literal_execute=True)
)
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
conn.execute(stmt, q=10)
asserter.assert_(
CursorSQL(
"SELECT some_table.id \nFROM some_table "
"\nWHERE some_table.x = 10",
() if config.db.dialect.positional else {},
)
)
def test_execute_expanding_plus_literal_execute(self):
table = self.tables.some_table
stmt = select([table.c.id]).where(
table.c.x.in_(bindparam("q", expanding=True, literal_execute=True))
)
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
conn.execute(stmt, q=[5, 6, 7])
asserter.assert_(
CursorSQL(
"SELECT some_table.id \nFROM some_table "
"\nWHERE some_table.x IN (5, 6, 7)",
() if config.db.dialect.positional else {},
)
)
@testing.requires.tuple_in
def test_execute_tuple_expanding_plus_literal_execute(self):
table = self.tables.some_table
stmt = select([table.c.id]).where(
tuple_(table.c.x, table.c.y).in_(
bindparam("q", expanding=True, literal_execute=True)
)
)
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
conn.execute(stmt, q=[(5, 10), (12, 18)])
asserter.assert_(
CursorSQL(
"SELECT some_table.id \nFROM some_table "
"\nWHERE (some_table.x, some_table.y) "
"IN (%s(5, 10), (12, 18))"
% ("VALUES " if config.db.dialect.tuple_in_values else ""),
() if config.db.dialect.positional else {},
)
)
class ExpandingBoundInTest(fixtures.TablesTest):
__backend__ = True
@@ -496,6 +674,18 @@ class ExpandingBoundInTest(fixtures.TablesTest):
params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
)
@testing.requires.tuple_in
def test_bound_in_heterogeneous_two_tuple_text(self):
stmt = text(
"select id FROM some_table WHERE (x, z) IN :q ORDER BY id"
).bindparams(bindparam("q", expanding=True))
self._assert_result(
stmt,
[(2,), (3,), (4,)],
params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
)
def test_empty_set_against_integer(self):
table = self.tables.some_table
+9 -6
View File
@@ -758,8 +758,9 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
s,
"SELECT TOP 10 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y",
checkparams={"x_1": 5},
"SELECT TOP [POSTCOMPILE_param_1] t.x, t.y FROM t "
"WHERE t.x = :x_1 ORDER BY t.y",
checkparams={"x_1": 5, "param_1": 10},
)
def test_limit_zero_using_top(self):
@@ -769,8 +770,9 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
s,
"SELECT TOP 0 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y",
checkparams={"x_1": 5},
"SELECT TOP [POSTCOMPILE_param_1] t.x, t.y FROM t "
"WHERE t.x = :x_1 ORDER BY t.y",
checkparams={"x_1": 5, "param_1": 0},
)
c = s.compile(dialect=mssql.dialect())
eq_(len(c._result_columns), 2)
@@ -906,8 +908,9 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
# of zero, so produces TOP 0
self.assert_compile(
s,
"SELECT TOP 0 t.x, t.y FROM t " "WHERE t.x = :x_1 ORDER BY t.y",
checkparams={"x_1": 5},
"SELECT TOP [POSTCOMPILE_param_1] t.x, t.y FROM t "
"WHERE t.x = :x_1 ORDER BY t.y",
checkparams={"x_1": 5, "param_1": 0},
)
def test_primary_key_no_identity(self):
+10 -3
View File
@@ -363,7 +363,9 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
DialectSQL(
"INSERT INTO t1 (data) VALUES (:data)", {"data": "somedata"}
),
CursorSQL("SELECT @@identity AS lastrowid"),
CursorSQL(
"SELECT @@identity AS lastrowid", consume_statement=False
),
)
@testing.provide_metadata
@@ -384,8 +386,12 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
# even with pyodbc, we don't embed the scope identity on a
# DEFAULT VALUES insert
asserter.assert_(
CursorSQL("INSERT INTO t1 DEFAULT VALUES"),
CursorSQL("SELECT scope_identity() AS lastrowid"),
CursorSQL(
"INSERT INTO t1 DEFAULT VALUES", consume_statement=False
),
CursorSQL(
"SELECT scope_identity() AS lastrowid", consume_statement=False
),
)
@testing.only_on("mssql+pyodbc")
@@ -410,6 +416,7 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
CursorSQL(
"INSERT INTO t1 (data) VALUES (?); select scope_identity()",
("somedata",),
consume_statement=False,
)
)
+140 -52
View File
@@ -1,14 +1,12 @@
# coding: utf-8
from sqlalchemy import and_
from sqlalchemy import bindparam
from sqlalchemy import exc
from sqlalchemy import except_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import literal_column
from sqlalchemy import MetaData
from sqlalchemy import or_
@@ -18,6 +16,7 @@ from sqlalchemy import select
from sqlalchemy import Sequence
from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import type_coerce
from sqlalchemy import TypeDecorator
@@ -28,7 +27,6 @@ from sqlalchemy.engine import default
from sqlalchemy.sql import column
from sqlalchemy.sql import quoted_name
from sqlalchemy.sql import table
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
@@ -93,13 +91,10 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
t.update().values(plain=5), 'UPDATE s SET "plain"=:"plain"'
)
def test_bindparam_quote_raise_on_expanding(self):
assert_raises_message(
exc.CompileError,
"Can't use expanding feature with parameter name 'uid' on "
"Oracle; it requires quoting which is not supported in this "
"context",
bindparam("uid", expanding=True).compile,
def test_bindparam_quote_works_on_expanding(self):
self.assert_compile(
bindparam("uid", expanding=True),
"([POSTCOMPILE_uid])",
dialect=cx_oracle.dialect(),
)
@@ -166,14 +161,33 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"anon_2.col2 AS col2, ROWNUM AS ora_rn FROM (SELECT "
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable) anon_2 WHERE ROWNUM <= "
":param_1 + :param_2) anon_1 WHERE ora_rn > :param_2",
checkparams={"param_1": 10, "param_2": 20},
"[POSTCOMPILE_param_1]) anon_1 WHERE ora_rn > "
"[POSTCOMPILE_param_2]",
checkparams={"param_1": 30, "param_2": 20},
)
c = s.compile(dialect=oracle.OracleDialect())
eq_(len(c._result_columns), 2)
assert t.c.col1 in set(c._create_result_map()["col1"][1])
def test_limit_one_firstrows(self):
t = table("sometable", column("col1"), column("col2"))
s = select([t])
s = select([t]).limit(10).offset(20)
self.assert_compile(
s,
"SELECT anon_1.col1, anon_1.col2 FROM "
"(SELECT /*+ FIRST_ROWS([POSTCOMPILE__ora_frow_1]) */ "
"anon_2.col1 AS col1, "
"anon_2.col2 AS col2, ROWNUM AS ora_rn FROM (SELECT "
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable) anon_2 WHERE ROWNUM <= "
"[POSTCOMPILE_param_1]) anon_1 WHERE ora_rn > "
"[POSTCOMPILE_param_2]",
checkparams={"_ora_frow_1": 10, "param_1": 30, "param_2": 20},
dialect=oracle.OracleDialect(optimize_limits=True),
)
def test_limit_two(self):
t = table("sometable", column("col1"), column("col2"))
s = select([t]).limit(10).offset(20).subquery()
@@ -188,9 +202,9 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, "
"sometable.col2 AS col2 FROM sometable) anon_3 "
"WHERE ROWNUM <= :param_1 + :param_2) anon_2 "
"WHERE ora_rn > :param_2) anon_1",
checkparams={"param_1": 10, "param_2": 20},
"WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_2 "
"WHERE ora_rn > [POSTCOMPILE_param_2]) anon_1",
checkparams={"param_1": 30, "param_2": 20},
)
self.assert_compile(
@@ -202,8 +216,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, "
"sometable.col2 AS col2 FROM sometable) anon_3 "
"WHERE ROWNUM <= :param_1 + :param_2) anon_2 "
"WHERE ora_rn > :param_2) anon_1",
"WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_2 "
"WHERE ora_rn > [POSTCOMPILE_param_2]) anon_1",
)
c = s2.compile(dialect=oracle.OracleDialect())
eq_(len(c._result_columns), 2)
@@ -221,8 +235,9 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable ORDER BY "
"sometable.col2) anon_2 WHERE ROWNUM <= "
":param_1 + :param_2) anon_1 WHERE ora_rn > :param_2",
checkparams={"param_1": 10, "param_2": 20},
"[POSTCOMPILE_param_1]) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_2]",
checkparams={"param_1": 30, "param_2": 20},
)
c = s.compile(dialect=oracle.OracleDialect())
eq_(len(c._result_columns), 2)
@@ -237,8 +252,25 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT anon_1.col1, anon_1.col2 FROM (SELECT "
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable ORDER BY "
"sometable.col2) anon_1 WHERE ROWNUM <= :param_1 "
"sometable.col2) anon_1 WHERE ROWNUM <= [POSTCOMPILE_param_1] "
"FOR UPDATE",
checkparams={"param_1": 10},
)
def test_limit_four_firstrows(self):
t = table("sometable", column("col1"), column("col2"))
s = select([t]).with_for_update().limit(10).order_by(t.c.col2)
self.assert_compile(
s,
"SELECT /*+ FIRST_ROWS([POSTCOMPILE__ora_frow_1]) */ "
"anon_1.col1, anon_1.col2 FROM (SELECT "
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable ORDER BY "
"sometable.col2) anon_1 WHERE ROWNUM <= [POSTCOMPILE_param_1] "
"FOR UPDATE",
checkparams={"param_1": 10, "_ora_frow_1": 10},
dialect=oracle.OracleDialect(optimize_limits=True),
)
def test_limit_five(self):
@@ -259,8 +291,30 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable ORDER BY "
"sometable.col2) anon_2 WHERE ROWNUM <= "
":param_1 + :param_2) anon_1 WHERE ora_rn > :param_2 FOR "
"[POSTCOMPILE_param_1]) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_2] FOR "
"UPDATE",
checkparams={"param_1": 30, "param_2": 20},
)
def test_limit_six(self):
t = table("sometable", column("col1"), column("col2"))
s = (
select([t])
.limit(10)
.offset(literal(10) + literal(20))
.order_by(t.c.col2)
)
self.assert_compile(
s,
"SELECT anon_1.col1, anon_1.col2 FROM (SELECT anon_2.col1 AS "
"col1, anon_2.col2 AS col2, ROWNUM AS ora_rn FROM "
"(SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
"FROM sometable ORDER BY sometable.col2) anon_2 WHERE "
"ROWNUM <= :param_1 + :param_2 + :param_3) anon_1 "
"WHERE ora_rn > :param_2 + :param_3",
checkparams={"param_1": 10, "param_2": 10, "param_3": 20},
)
def test_limit_special_quoting(self):
@@ -280,7 +334,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'SELECT anon_1."SUM(ABC)" FROM '
'(SELECT SUM(ABC) AS "SUM(ABC)" '
"FROM my_table ORDER BY SUM(ABC)) anon_1 "
"WHERE ROWNUM <= :param_1",
"WHERE ROWNUM <= [POSTCOMPILE_param_1]",
)
col = literal_column("SUM(ABC)").label(quoted_name("SUM(ABC)", True))
@@ -292,7 +346,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'SELECT anon_1."SUM(ABC)" FROM '
'(SELECT SUM(ABC) AS "SUM(ABC)" '
"FROM my_table ORDER BY SUM(ABC)) anon_1 "
"WHERE ROWNUM <= :param_1",
"WHERE ROWNUM <= [POSTCOMPILE_param_1]",
)
col = literal_column("SUM(ABC)").label("SUM(ABC)_")
@@ -304,7 +358,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'SELECT anon_1."SUM(ABC)_" FROM '
'(SELECT SUM(ABC) AS "SUM(ABC)_" '
"FROM my_table ORDER BY SUM(ABC)) anon_1 "
"WHERE ROWNUM <= :param_1",
"WHERE ROWNUM <= [POSTCOMPILE_param_1]",
)
col = literal_column("SUM(ABC)").label(quoted_name("SUM(ABC)_", True))
@@ -316,7 +370,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'SELECT anon_1."SUM(ABC)_" FROM '
'(SELECT SUM(ABC) AS "SUM(ABC)_" '
"FROM my_table ORDER BY SUM(ABC)) anon_1 "
"WHERE ROWNUM <= :param_1",
"WHERE ROWNUM <= [POSTCOMPILE_param_1]",
)
def test_for_update(self):
@@ -410,7 +464,9 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT anon_1.myid, anon_1.name FROM "
"(SELECT mytable.myid AS myid, mytable.name AS name "
"FROM mytable WHERE mytable.myid = :myid_1) anon_1 "
"WHERE ROWNUM <= :param_1 FOR UPDATE OF anon_1.name NOWAIT",
"WHERE ROWNUM <= [POSTCOMPILE_param_1] "
"FOR UPDATE OF anon_1.name NOWAIT",
checkparams={"param_1": 10, "myid_1": 7},
)
def test_for_update_of_w_limit_adaption_col_unpresent(self):
@@ -424,7 +480,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT anon_1.myid FROM "
"(SELECT mytable.myid AS myid, mytable.name AS name "
"FROM mytable WHERE mytable.myid = :myid_1) anon_1 "
"WHERE ROWNUM <= :param_1 FOR UPDATE OF anon_1.name NOWAIT",
"WHERE ROWNUM <= [POSTCOMPILE_param_1] "
"FOR UPDATE OF anon_1.name NOWAIT",
)
def test_for_update_of_w_limit_offset_adaption_col_present(self):
@@ -441,9 +498,10 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"ROWNUM AS ora_rn "
"FROM (SELECT mytable.myid AS myid, mytable.name AS name "
"FROM mytable WHERE mytable.myid = :myid_1) anon_2 "
"WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
"WHERE ora_rn > :param_2 "
"WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_2] "
"FOR UPDATE OF anon_1.name NOWAIT",
checkparams={"param_1": 60, "param_2": 50, "myid_1": 7},
)
def test_for_update_of_w_limit_offset_adaption_col_unpresent(self):
@@ -459,9 +517,10 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"ROWNUM AS ora_rn, anon_2.name AS name "
"FROM (SELECT mytable.myid AS myid, mytable.name AS name "
"FROM mytable WHERE mytable.myid = :myid_1) anon_2 "
"WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
"WHERE ora_rn > :param_2 "
"WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_2] "
"FOR UPDATE OF anon_1.name NOWAIT",
checkparams={"param_1": 60, "param_2": 50, "myid_1": 7},
)
def test_for_update_of_w_limit_offset_adaption_partial_col_unpresent(self):
@@ -479,9 +538,10 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"mytable.bar AS bar, "
"mytable.foo AS foo FROM mytable "
"WHERE mytable.myid = :myid_1) anon_2 "
"WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
"WHERE ora_rn > :param_2 "
"WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_2] "
"FOR UPDATE OF anon_1.foo, anon_1.bar NOWAIT",
checkparams={"param_1": 60, "param_2": 50, "myid_1": 7},
)
def test_limit_preserves_typing_information(self):
@@ -495,59 +555,82 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
def test_use_binds_for_limits_disabled_one(self):
t = table("sometable", column("col1"), column("col2"))
dialect = oracle.OracleDialect(use_binds_for_limits=False)
with testing.expect_deprecated(
"The ``use_binds_for_limits`` Oracle dialect parameter is "
"deprecated."
):
dialect = oracle.OracleDialect(use_binds_for_limits=False)
self.assert_compile(
select([t]).limit(10),
"SELECT anon_1.col1, anon_1.col2 FROM "
"(SELECT sometable.col1 AS col1, "
"sometable.col2 AS col2 FROM sometable) anon_1 WHERE ROWNUM <= 10",
"sometable.col2 AS col2 FROM sometable) anon_1 "
"WHERE ROWNUM <= [POSTCOMPILE_param_1]",
dialect=dialect,
)
def test_use_binds_for_limits_disabled_two(self):
t = table("sometable", column("col1"), column("col2"))
dialect = oracle.OracleDialect(use_binds_for_limits=False)
with testing.expect_deprecated(
"The ``use_binds_for_limits`` Oracle dialect parameter is "
"deprecated."
):
dialect = oracle.OracleDialect(use_binds_for_limits=False)
self.assert_compile(
select([t]).offset(10),
"SELECT anon_1.col1, anon_1.col2 FROM (SELECT "
"anon_2.col1 AS col1, anon_2.col2 AS col2, ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
"FROM sometable) anon_2) anon_1 WHERE ora_rn > 10",
"FROM sometable) anon_2) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_1]",
dialect=dialect,
)
def test_use_binds_for_limits_disabled_three(self):
t = table("sometable", column("col1"), column("col2"))
dialect = oracle.OracleDialect(use_binds_for_limits=False)
with testing.expect_deprecated(
"The ``use_binds_for_limits`` Oracle dialect parameter is "
"deprecated."
):
dialect = oracle.OracleDialect(use_binds_for_limits=False)
self.assert_compile(
select([t]).limit(10).offset(10),
"SELECT anon_1.col1, anon_1.col2 FROM (SELECT "
"anon_2.col1 AS col1, anon_2.col2 AS col2, ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
"FROM sometable) anon_2 WHERE ROWNUM <= 20) anon_1 "
"WHERE ora_rn > 10",
"FROM sometable) anon_2 "
"WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_2]",
dialect=dialect,
)
def test_use_binds_for_limits_enabled_one(self):
t = table("sometable", column("col1"), column("col2"))
dialect = oracle.OracleDialect(use_binds_for_limits=True)
with testing.expect_deprecated(
"The ``use_binds_for_limits`` Oracle dialect parameter is "
"deprecated."
):
dialect = oracle.OracleDialect(use_binds_for_limits=True)
self.assert_compile(
select([t]).limit(10),
"SELECT anon_1.col1, anon_1.col2 FROM "
"(SELECT sometable.col1 AS col1, "
"sometable.col2 AS col2 FROM sometable) anon_1 WHERE ROWNUM "
"<= :param_1",
"<= [POSTCOMPILE_param_1]",
dialect=dialect,
)
def test_use_binds_for_limits_enabled_two(self):
t = table("sometable", column("col1"), column("col2"))
dialect = oracle.OracleDialect(use_binds_for_limits=True)
with testing.expect_deprecated(
"The ``use_binds_for_limits`` Oracle dialect parameter is "
"deprecated."
):
dialect = oracle.OracleDialect(use_binds_for_limits=True)
self.assert_compile(
select([t]).offset(10),
@@ -555,13 +638,18 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"(SELECT anon_2.col1 AS col1, anon_2.col2 AS col2, "
"ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
"FROM sometable) anon_2) anon_1 WHERE ora_rn > :param_1",
"FROM sometable) anon_2) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_1]",
dialect=dialect,
)
def test_use_binds_for_limits_enabled_three(self):
t = table("sometable", column("col1"), column("col2"))
dialect = oracle.OracleDialect(use_binds_for_limits=True)
with testing.expect_deprecated(
"The ``use_binds_for_limits`` Oracle dialect parameter is "
"deprecated."
):
dialect = oracle.OracleDialect(use_binds_for_limits=True)
self.assert_compile(
select([t]).limit(10).offset(10),
@@ -570,10 +658,10 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
"FROM sometable) anon_2 "
"WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
"WHERE ora_rn > :param_2",
"WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_2]",
dialect=dialect,
checkparams={"param_1": 10, "param_2": 10},
checkparams={"param_1": 20, "param_2": 10},
)
def test_long_labels(self):
@@ -771,9 +859,9 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"thirdtable.userid(+) = "
"myothertable.otherid AND mytable.myid = "
"myothertable.otherid ORDER BY mytable.name) anon_2 "
"WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
"WHERE ora_rn > :param_2",
checkparams={"param_1": 10, "param_2": 5},
"WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
"WHERE ora_rn > [POSTCOMPILE_param_2]",
checkparams={"param_1": 15, "param_2": 5},
dialect=oracle.dialect(use_ansi=False),
)
+13
View File
@@ -150,6 +150,19 @@ class QuotedBindRoundTripTest(fixtures.TestBase):
testing.db.execute(t.insert(), {"100K": 10})
eq_(testing.db.scalar(t.select()), 10)
@testing.provide_metadata
def test_expanding_quote_roundtrip(self):
t = Table("asfd", self.metadata, Column("foo", Integer))
t.create()
with testing.db.connect() as conn:
conn.execute(
select([t]).where(
t.c.foo.in_(bindparam("uid", expanding=True))
),
uid=[1, 2, 3],
)
class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
def _dialect(self, server_version, **kw):
+1 -1
View File
@@ -491,7 +491,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{"id": 30, "data": "d1"},
),
CursorSQL("select nextval('my_seq')"),
CursorSQL("select nextval('my_seq')", consume_statement=False),
DialectSQL(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{"id": 1, "data": "d2"},
+16 -16
View File
@@ -108,7 +108,7 @@ class BaseAndSubFixture(object):
"a.type AS a_type, "
"asub.asubdata AS asub_asubdata FROM a JOIN asub "
"ON a.id = asub.id "
"WHERE a.id IN ([EXPANDING_primary_keys]) "
"WHERE a.id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY a.id",
{"primary_keys": [2]},
),
@@ -121,13 +121,13 @@ class BaseAndSubFixture(object):
"SELECT c.a_sub_id AS c_a_sub_id, "
"c.id AS c_id "
"FROM c WHERE c.a_sub_id "
"IN ([EXPANDING_primary_keys]) ORDER BY c.a_sub_id",
"IN ([POSTCOMPILE_primary_keys]) ORDER BY c.a_sub_id",
{"primary_keys": [2]},
),
),
CompiledSQL(
"SELECT b.a_id AS b_a_id, b.id AS b_id FROM b "
"WHERE b.a_id IN ([EXPANDING_primary_keys]) "
"WHERE b.a_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY b.a_id",
{"primary_keys": [1, 2]},
),
@@ -208,7 +208,7 @@ class FixtureLoadTest(_Polymorphic, testing.AssertsExecutionResults):
"engineers.primary_language AS engineers_primary_language "
"FROM people JOIN engineers "
"ON people.person_id = engineers.person_id "
"WHERE people.person_id IN ([EXPANDING_primary_keys]) "
"WHERE people.person_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.person_id",
{"primary_keys": [1, 2, 5]},
),
@@ -220,7 +220,7 @@ class FixtureLoadTest(_Polymorphic, testing.AssertsExecutionResults):
"managers.manager_name AS managers_manager_name "
"FROM people JOIN managers "
"ON people.person_id = managers.person_id "
"WHERE people.person_id IN ([EXPANDING_primary_keys]) "
"WHERE people.person_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.person_id",
{"primary_keys": [3, 4]},
),
@@ -254,7 +254,7 @@ class FixtureLoadTest(_Polymorphic, testing.AssertsExecutionResults):
"people.person_id AS people_person_id, "
"people.name AS people_name, people.type AS people_type "
"FROM people WHERE people.company_id "
"IN ([EXPANDING_primary_keys]) "
"IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.company_id, people.person_id",
{"primary_keys": [1, 2]},
),
@@ -268,7 +268,7 @@ class FixtureLoadTest(_Polymorphic, testing.AssertsExecutionResults):
"managers.manager_name AS managers_manager_name "
"FROM people JOIN managers "
"ON people.person_id = managers.person_id "
"WHERE people.person_id IN ([EXPANDING_primary_keys]) "
"WHERE people.person_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.person_id",
{"primary_keys": [3, 4]},
),
@@ -282,7 +282,7 @@ class FixtureLoadTest(_Polymorphic, testing.AssertsExecutionResults):
"engineers.primary_language AS engineers_primary_language "
"FROM people JOIN engineers "
"ON people.person_id = engineers.person_id "
"WHERE people.person_id IN ([EXPANDING_primary_keys]) "
"WHERE people.person_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.person_id",
{"primary_keys": [1, 2, 5]},
),
@@ -337,7 +337,7 @@ class TestGeometries(GeometryFixtureBase):
"c.c_data AS c_c_data, c.e_data AS c_e_data, "
"c.d_data AS c_d_data "
"FROM a JOIN c ON a.id = c.id "
"WHERE a.id IN ([EXPANDING_primary_keys]) ORDER BY a.id",
"WHERE a.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a.id",
[{"primary_keys": [1, 2]}],
),
CompiledSQL(
@@ -345,7 +345,7 @@ class TestGeometries(GeometryFixtureBase):
"c.c_data AS c_c_data, "
"c.d_data AS c_d_data, c.e_data AS c_e_data "
"FROM a JOIN c ON a.id = c.id "
"WHERE a.id IN ([EXPANDING_primary_keys]) ORDER BY a.id",
"WHERE a.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a.id",
[{"primary_keys": [1, 2]}],
),
),
@@ -392,7 +392,7 @@ class TestGeometries(GeometryFixtureBase):
"c.c_data AS c_c_data, c.e_data AS c_e_data, "
"c.d_data AS c_d_data "
"FROM a JOIN c ON a.id = c.id "
"WHERE a.id IN ([EXPANDING_primary_keys]) ORDER BY a.id",
"WHERE a.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a.id",
[{"primary_keys": [1, 2]}],
),
CompiledSQL(
@@ -400,7 +400,7 @@ class TestGeometries(GeometryFixtureBase):
"c.c_data AS c_c_data, c.d_data AS c_d_data, "
"c.e_data AS c_e_data "
"FROM a JOIN c ON a.id = c.id "
"WHERE a.id IN ([EXPANDING_primary_keys]) ORDER BY a.id",
"WHERE a.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a.id",
[{"primary_keys": [1, 2]}],
),
),
@@ -468,7 +468,7 @@ class TestGeometries(GeometryFixtureBase):
"e.id AS e_id, e.e_data AS e_e_data FROM a JOIN c "
"ON a.id = c.id LEFT OUTER JOIN d ON c.id = d.id "
"LEFT OUTER JOIN e ON c.id = e.id) AS poly "
"WHERE poly.a_id IN ([EXPANDING_primary_keys]) "
"WHERE poly.a_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY poly.a_id",
[{"primary_keys": [1, 2]}],
),
@@ -484,7 +484,7 @@ class TestGeometries(GeometryFixtureBase):
"e.e_data AS e_e_data FROM a JOIN c ON a.id = c.id "
"LEFT OUTER JOIN d ON c.id = d.id "
"LEFT OUTER JOIN e ON c.id = e.id) AS poly "
"WHERE poly.a_id IN ([EXPANDING_primary_keys]) "
"WHERE poly.a_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY poly.a_id",
[{"primary_keys": [1, 2]}],
),
@@ -621,7 +621,7 @@ class LoaderOptionsTest(
"child.type AS child_type "
"FROM child JOIN child_subclass1 "
"ON child.id = child_subclass1.id "
"WHERE child.id IN ([EXPANDING_primary_keys]) "
"WHERE child.id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY child.id",
[{"primary_keys": [1]}],
),
@@ -671,7 +671,7 @@ class LoaderOptionsTest(
"ON child.id = child_subclass1.id "
"LEFT OUTER JOIN other AS other_1 "
"ON child_subclass1.id = other_1.child_subclass_id "
"WHERE child.id IN ([EXPANDING_primary_keys]) "
"WHERE child.id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY child.id",
[{"primary_keys": [1]}],
),
+1 -1
View File
@@ -274,7 +274,7 @@ class AltSelectableTest(
"SELECT a_1.id AS a_1_id, b.id AS b_id FROM a AS a_1 "
"JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) "
"ON a_1.b_id = b.id WHERE a_1.id "
"IN ([EXPANDING_primary_keys]) ORDER BY a_1.id",
"IN ([POSTCOMPILE_primary_keys]) ORDER BY a_1.id",
[{"primary_keys": [1]}],
),
)
+1 -1
View File
@@ -342,7 +342,7 @@ class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL):
"FROM (SELECT anon_2.users_id AS users_id, "
"anon_2.users_name AS users_name FROM "
"(SELECT users.id AS users_id, users.name AS users_name "
"FROM users) anon_2 WHERE ROWNUM <= :param_1) anon_1 "
"FROM users) anon_2 WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
"LEFT OUTER JOIN addresses addresses_1 "
"ON anon_1.users_id = addresses_1.user_id FOR UPDATE",
dialect="oracle",
+20 -20
View File
@@ -1642,7 +1642,7 @@ class BaseRelationFromJoinedSubclassTest(_Polymorphic):
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
"IN ([EXPANDING_primary_keys]) "
"IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
@@ -1692,7 +1692,7 @@ class BaseRelationFromJoinedSubclassTest(_Polymorphic):
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
"IN ([EXPANDING_primary_keys]) "
"IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
@@ -1738,7 +1738,7 @@ class BaseRelationFromJoinedSubclassTest(_Polymorphic):
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
"IN ([EXPANDING_primary_keys]) "
"IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
@@ -1792,7 +1792,7 @@ class BaseRelationFromJoinedSubclassTest(_Polymorphic):
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
"IN ([EXPANDING_primary_keys]) "
"IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
@@ -1840,7 +1840,7 @@ class BaseRelationFromJoinedSubclassTest(_Polymorphic):
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
"IN ([EXPANDING_primary_keys]) "
"IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
@@ -2061,7 +2061,7 @@ class TupleTest(fixtures.DeclarativeMappedTest):
CompiledSQL(
"SELECT b.a_id1 AS b_a_id1, b.a_id2 AS b_a_id2, b.id AS b_id "
"FROM b WHERE (b.a_id1, b.a_id2) IN "
"([EXPANDING_primary_keys]) ORDER BY b.a_id1, b.a_id2, b.id",
"([POSTCOMPILE_primary_keys]) ORDER BY b.a_id1, b.a_id2, b.id",
[{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
),
)
@@ -2092,7 +2092,7 @@ class TupleTest(fixtures.DeclarativeMappedTest):
),
CompiledSQL(
"SELECT a.id1 AS a_id1, a.id2 AS a_id2 FROM a "
"WHERE (a.id1, a.id2) IN ([EXPANDING_primary_keys])",
"WHERE (a.id1, a.id2) IN ([POSTCOMPILE_primary_keys])",
[{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
),
)
@@ -2166,19 +2166,19 @@ class ChunkingTest(fixtures.DeclarativeMappedTest):
CompiledSQL(
"SELECT b.a_id AS b_a_id, b.id AS b_id "
"FROM b WHERE b.a_id IN "
"([EXPANDING_primary_keys]) ORDER BY b.a_id, b.id",
"([POSTCOMPILE_primary_keys]) ORDER BY b.a_id, b.id",
{"primary_keys": list(range(1, 48))},
),
CompiledSQL(
"SELECT b.a_id AS b_a_id, b.id AS b_id "
"FROM b WHERE b.a_id IN "
"([EXPANDING_primary_keys]) ORDER BY b.a_id, b.id",
"([POSTCOMPILE_primary_keys]) ORDER BY b.a_id, b.id",
{"primary_keys": list(range(48, 95))},
),
CompiledSQL(
"SELECT b.a_id AS b_a_id, b.id AS b_id "
"FROM b WHERE b.a_id IN "
"([EXPANDING_primary_keys]) ORDER BY b.a_id, b.id",
"([POSTCOMPILE_primary_keys]) ORDER BY b.a_id, b.id",
{"primary_keys": list(range(95, 101))},
),
)
@@ -2242,19 +2242,19 @@ class ChunkingTest(fixtures.DeclarativeMappedTest):
# chunk size is 47. so first chunk are a 1->47...
CompiledSQL(
"SELECT a.id AS a_id FROM a WHERE a.id IN "
"([EXPANDING_primary_keys])",
"([POSTCOMPILE_primary_keys])",
{"primary_keys": list(range(1, 48))},
),
# second chunk is a 48-94
CompiledSQL(
"SELECT a.id AS a_id FROM a WHERE a.id IN "
"([EXPANDING_primary_keys])",
"([POSTCOMPILE_primary_keys])",
{"primary_keys": list(range(48, 95))},
),
# third and final chunk 95-100.
CompiledSQL(
"SELECT a.id AS a_id FROM a WHERE a.id IN "
"([EXPANDING_primary_keys])",
"([POSTCOMPILE_primary_keys])",
{"primary_keys": list(range(95, 101))},
),
)
@@ -2779,14 +2779,14 @@ class SelfRefInheritanceAliasedTest(
"SELECT foo_1.id AS foo_1_id, "
"foo_1.type AS foo_1_type, foo_1.foo_id AS foo_1_foo_id "
"FROM foo AS foo_1 "
"WHERE foo_1.id IN ([EXPANDING_primary_keys])",
"WHERE foo_1.id IN ([POSTCOMPILE_primary_keys])",
{"primary_keys": [3]},
),
CompiledSQL(
"SELECT foo_1.id AS foo_1_id, "
"foo_1.type AS foo_1_type, foo_1.foo_id AS foo_1_foo_id "
"FROM foo AS foo_1 "
"WHERE foo_1.id IN ([EXPANDING_primary_keys])",
"WHERE foo_1.id IN ([POSTCOMPILE_primary_keys])",
{"primary_keys": [1]},
),
)
@@ -2956,7 +2956,7 @@ class SingleInhSubclassTest(
CompiledSQL(
"SELECT role.user_id AS role_user_id, role.id AS role_id "
"FROM role WHERE role.user_id "
"IN ([EXPANDING_primary_keys]) ORDER BY role.user_id",
"IN ([POSTCOMPILE_primary_keys]) ORDER BY role.user_id",
{"primary_keys": [1]},
),
)
@@ -3071,7 +3071,7 @@ class M2OWDegradeTest(
),
CompiledSQL(
"SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
"FROM b WHERE b.id IN ([EXPANDING_primary_keys])",
"FROM b WHERE b.id IN ([POSTCOMPILE_primary_keys])",
[{"primary_keys": [1, 2]}],
),
)
@@ -3106,7 +3106,7 @@ class M2OWDegradeTest(
"SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
"b.y AS b_y "
"FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
"WHERE a_1.id IN ([EXPANDING_primary_keys]) ORDER BY a_1.id",
"WHERE a_1.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a_1.id",
[{"primary_keys": [1, 3]}],
),
)
@@ -3130,7 +3130,7 @@ class M2OWDegradeTest(
),
CompiledSQL(
"SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
"FROM b WHERE b.id IN ([EXPANDING_primary_keys])",
"FROM b WHERE b.id IN ([POSTCOMPILE_primary_keys])",
[{"primary_keys": [1, 2]}],
),
)
@@ -3165,7 +3165,7 @@ class M2OWDegradeTest(
"SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
"b.y AS b_y "
"FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
"WHERE a_1.id IN ([EXPANDING_primary_keys]) ORDER BY a_1.id",
"WHERE a_1.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a_1.id",
[{"primary_keys": [1, 2, 3, 4, 5]}],
),
)
+501 -460
View File
@@ -311,20 +311,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
checkparams=params,
)
def test_limit_offset_select_literal_binds(self):
stmt = select([1]).limit(5).offset(6)
self.assert_compile(
stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True
)
def test_limit_offset_compound_select_literal_binds(self):
stmt = select([1]).union(select([2])).limit(5).offset(6)
self.assert_compile(
stmt,
"SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6",
literal_binds=True,
)
def test_select_precol_compile_ordering(self):
s1 = (
select([column("x")])
@@ -1304,20 +1290,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT mytable.myid FROM mytable",
)
def test_multiple_col_binds(self):
self.assert_compile(
select(
[literal_column("*")],
or_(
table1.c.myid == 12,
table1.c.myid == "asdf",
table1.c.myid == "foo",
),
),
"SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
"OR mytable.myid = :myid_2 OR mytable.myid = :myid_3",
)
def test_order_by_nulls(self):
self.assert_compile(
table2.select(
@@ -1631,71 +1603,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
dialect=mysql.dialect(),
)
def test_render_binds_as_literal(self):
"""test a compiler that renders binds inline into
SQL in the columns clause."""
dialect = default.DefaultDialect()
class Compiler(dialect.statement_compiler):
ansi_bind_rules = True
dialect.statement_compiler = Compiler
self.assert_compile(
select([literal("someliteral")]),
"SELECT 'someliteral' AS anon_1",
dialect=dialect,
)
self.assert_compile(
select([table1.c.myid + 3]),
"SELECT mytable.myid + 3 AS anon_1 FROM mytable",
dialect=dialect,
)
self.assert_compile(
select([table1.c.myid.in_([4, 5, 6])]),
"SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
dialect=dialect,
)
self.assert_compile(
select([func.mod(table1.c.myid, 5)]),
"SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
dialect=dialect,
)
self.assert_compile(
select([literal("foo").in_([])]),
"SELECT 1 != 1 AS anon_1",
dialect=dialect,
)
self.assert_compile(
select([literal(util.b("foo"))]),
"SELECT 'foo' AS anon_1",
dialect=dialect,
)
# test callable
self.assert_compile(
select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]),
"SELECT mytable.myid = 5 AS anon_1 FROM mytable",
dialect=dialect,
)
empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic")
empty_in_dialect.statement_compiler = Compiler
assert_raises_message(
exc.CompileError,
"Bind parameter 'foo' without a "
"renderable value not allowed here.",
bindparam("foo").in_([]).compile,
dialect=empty_in_dialect,
)
def test_collate(self):
# columns clause
self.assert_compile(
@@ -2214,373 +2121,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
" LIMIT -1 OFFSET :param_2) AS anon_2",
)
def test_binds(self):
for (
stmt,
expected_named_stmt,
expected_positional_stmt,
expected_default_params_dict,
expected_default_params_list,
test_param_dict,
expected_test_params_dict,
expected_test_params_list,
) in [
(
select(
[table1, table2],
and_(
table1.c.myid == table2.c.otherid,
table1.c.name == bindparam("mytablename"),
),
),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername FROM mytable, "
"myothertable WHERE mytable.myid = myothertable.otherid "
"AND mytable.name = :mytablename",
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername FROM mytable, "
"myothertable WHERE mytable.myid = myothertable.otherid AND "
"mytable.name = ?",
{"mytablename": None},
[None],
{"mytablename": 5},
{"mytablename": 5},
[5],
),
(
select(
[table1],
or_(
table1.c.myid == bindparam("myid"),
table2.c.otherid == bindparam("myid"),
),
),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable, myothertable WHERE mytable.myid = :myid "
"OR myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable, myothertable WHERE mytable.myid = ? "
"OR myothertable.otherid = ?",
{"myid": None},
[None, None],
{"myid": 5},
{"myid": 5},
[5, 5],
),
(
text(
"SELECT mytable.myid, mytable.name, "
"mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = :myid OR "
"myothertable.otherid = :myid"
),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = :myid OR "
"myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = ? OR "
"myothertable.otherid = ?",
{"myid": None},
[None, None],
{"myid": 5},
{"myid": 5},
[5, 5],
),
(
select(
[table1],
or_(
table1.c.myid == bindparam("myid", unique=True),
table2.c.otherid == bindparam("myid", unique=True),
),
),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = ? "
"OR myothertable.otherid = ?",
{"myid_1": None, "myid_2": None},
[None, None],
{"myid_1": 5, "myid_2": 6},
{"myid_1": 5, "myid_2": 6},
[5, 6],
),
(
bindparam("test", type_=String, required=False) + text("'hi'"),
":test || 'hi'",
"? || 'hi'",
{"test": None},
[None],
{},
{"test": None},
[None],
),
(
# testing select.params() here - bindparam() objects
# must get required flag set to False
select(
[table1],
or_(
table1.c.myid == bindparam("myid"),
table2.c.otherid == bindparam("myotherid"),
),
).params({"myid": 8, "myotherid": 7}),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid OR myothertable.otherid = :myotherid",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
"? OR myothertable.otherid = ?",
{"myid": 8, "myotherid": 7},
[8, 7],
{"myid": 5},
{"myid": 5, "myotherid": 7},
[5, 7],
),
(
select(
[table1],
or_(
table1.c.myid
== bindparam("myid", value=7, unique=True),
table2.c.otherid
== bindparam("myid", value=8, unique=True),
),
),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
"? OR myothertable.otherid = ?",
{"myid_1": 7, "myid_2": 8},
[7, 8],
{"myid_1": 5, "myid_2": 6},
{"myid_1": 5, "myid_2": 6},
[5, 6],
),
]:
self.assert_compile(
stmt, expected_named_stmt, params=expected_default_params_dict
)
self.assert_compile(
stmt, expected_positional_stmt, dialect=sqlite.dialect()
)
nonpositional = stmt.compile()
positional = stmt.compile(dialect=sqlite.dialect())
pp = positional.params
eq_(
[pp[k] for k in positional.positiontup],
expected_default_params_list,
)
eq_(
nonpositional.construct_params(test_param_dict),
expected_test_params_dict,
)
pp = positional.construct_params(test_param_dict)
eq_(
[pp[k] for k in positional.positiontup],
expected_test_params_list,
)
# check that params() doesn't modify original statement
s = select(
[table1],
or_(
table1.c.myid == bindparam("myid"),
table2.c.otherid == bindparam("myotherid"),
),
)
s2 = s.params({"myid": 8, "myotherid": 7})
s3 = s2.params({"myid": 9})
assert s.compile().params == {"myid": None, "myotherid": None}
assert s2.compile().params == {"myid": 8, "myotherid": 7}
assert s3.compile().params == {"myid": 9, "myotherid": 7}
# test using same 'unique' param object twice in one compile
s = (
select([table1.c.myid])
.where(table1.c.myid == 12)
.scalar_subquery()
)
s2 = select([table1, s], table1.c.myid == s)
self.assert_compile(
s2,
"SELECT mytable.myid, mytable.name, mytable.description, "
"(SELECT mytable.myid FROM mytable WHERE mytable.myid = "
":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = "
"(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
)
positional = s2.compile(dialect=sqlite.dialect())
pp = positional.params
assert [pp[k] for k in positional.positiontup] == [12, 12]
# check that conflicts with "unique" params are caught
s = select(
[table1],
or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")),
)
assert_raises_message(
exc.CompileError,
"conflicts with unique bind parameter " "of the same name",
str,
s,
)
s = select(
[table1],
or_(
table1.c.myid == 7,
table1.c.myid == 8,
table1.c.myid == bindparam("myid_1"),
),
)
assert_raises_message(
exc.CompileError,
"conflicts with unique bind parameter " "of the same name",
str,
s,
)
def _test_binds_no_hash_collision(self):
"""test that construct_params doesn't corrupt dict
due to hash collisions"""
total_params = 100000
in_clause = [":in%d" % i for i in range(total_params)]
params = dict(("in%d" % i, i) for i in range(total_params))
t = text("text clause %s" % ", ".join(in_clause))
eq_(len(t.bindparams), total_params)
c = t.compile()
pp = c.construct_params(params)
eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp)))
eq_(len(set(pp.values())), total_params)
def test_bind_as_col(self):
t = table("foo", column("id"))
s = select([t, literal("lala").label("hoho")])
self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
assert [str(c) for c in s.subquery().c] == ["anon_1.id", "anon_1.hoho"]
def test_bind_callable(self):
expr = column("x") == bindparam("key", callable_=lambda: 12)
self.assert_compile(expr, "x = :key", {"x": 12})
def test_bind_params_missing(self):
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x'",
select([table1])
.where(
and_(
table1.c.myid == bindparam("x", required=True),
table1.c.name == bindparam("y", required=True),
)
)
.compile()
.construct_params,
params=dict(y=5),
)
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x'",
select([table1])
.where(table1.c.myid == bindparam("x", required=True))
.compile()
.construct_params,
)
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x', "
"in parameter group 2",
select([table1])
.where(
and_(
table1.c.myid == bindparam("x", required=True),
table1.c.name == bindparam("y", required=True),
)
)
.compile()
.construct_params,
params=dict(y=5),
_group_number=2,
)
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x', "
"in parameter group 2",
select([table1])
.where(table1.c.myid == bindparam("x", required=True))
.compile()
.construct_params,
_group_number=2,
)
def test_tuple(self):
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
"(mytable.myid, mytable.name) IN "
"((:param_1, :param_2), (:param_3, :param_4))",
)
dialect = default.DefaultDialect()
dialect.tuple_in_values = True
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
"(mytable.myid, mytable.name) IN "
"(VALUES (:param_1, :param_2), (:param_3, :param_4))",
dialect=dialect,
)
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
[tuple_(table2.c.otherid, table2.c.othername)]
),
"(mytable.myid, mytable.name) IN "
"((myothertable.otherid, myothertable.othername))",
)
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
select([table2.c.otherid, table2.c.othername])
),
"(mytable.myid, mytable.name) IN (SELECT "
"myothertable.otherid, myothertable.othername FROM myothertable)",
)
def test_expanding_parameter(self):
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
bindparam("foo", expanding=True)
),
"(mytable.myid, mytable.name) IN ([EXPANDING_foo])",
)
dialect = default.DefaultDialect()
dialect.tuple_in_values = True
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
bindparam("foo", expanding=True)
),
"(mytable.myid, mytable.name) IN ([EXPANDING_foo])",
dialect=dialect,
)
self.assert_compile(
table1.c.myid.in_(bindparam("foo", expanding=True)),
"mytable.myid IN ([EXPANDING_foo])",
)
def test_cast(self):
tbl = table(
"casttest",
@@ -3250,6 +2790,507 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises(exc.ArgumentError, and_, ("a",), ("b",))
class BindParameterTest(AssertsCompiledSQL, fixtures.TestBase):
__dialect__ = "default"
def test_binds(self):
for (
stmt,
expected_named_stmt,
expected_positional_stmt,
expected_default_params_dict,
expected_default_params_list,
test_param_dict,
expected_test_params_dict,
expected_test_params_list,
) in [
(
select(
[table1, table2],
and_(
table1.c.myid == table2.c.otherid,
table1.c.name == bindparam("mytablename"),
),
),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername FROM mytable, "
"myothertable WHERE mytable.myid = myothertable.otherid "
"AND mytable.name = :mytablename",
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername FROM mytable, "
"myothertable WHERE mytable.myid = myothertable.otherid AND "
"mytable.name = ?",
{"mytablename": None},
[None],
{"mytablename": 5},
{"mytablename": 5},
[5],
),
(
select(
[table1],
or_(
table1.c.myid == bindparam("myid"),
table2.c.otherid == bindparam("myid"),
),
),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable, myothertable WHERE mytable.myid = :myid "
"OR myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable, myothertable WHERE mytable.myid = ? "
"OR myothertable.otherid = ?",
{"myid": None},
[None, None],
{"myid": 5},
{"myid": 5},
[5, 5],
),
(
text(
"SELECT mytable.myid, mytable.name, "
"mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = :myid OR "
"myothertable.otherid = :myid"
),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = :myid OR "
"myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = ? OR "
"myothertable.otherid = ?",
{"myid": None},
[None, None],
{"myid": 5},
{"myid": 5},
[5, 5],
),
(
select(
[table1],
or_(
table1.c.myid == bindparam("myid", unique=True),
table2.c.otherid == bindparam("myid", unique=True),
),
),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = ? "
"OR myothertable.otherid = ?",
{"myid_1": None, "myid_2": None},
[None, None],
{"myid_1": 5, "myid_2": 6},
{"myid_1": 5, "myid_2": 6},
[5, 6],
),
(
bindparam("test", type_=String, required=False) + text("'hi'"),
":test || 'hi'",
"? || 'hi'",
{"test": None},
[None],
{},
{"test": None},
[None],
),
(
# testing select.params() here - bindparam() objects
# must get required flag set to False
select(
[table1],
or_(
table1.c.myid == bindparam("myid"),
table2.c.otherid == bindparam("myotherid"),
),
).params({"myid": 8, "myotherid": 7}),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid OR myothertable.otherid = :myotherid",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
"? OR myothertable.otherid = ?",
{"myid": 8, "myotherid": 7},
[8, 7],
{"myid": 5},
{"myid": 5, "myotherid": 7},
[5, 7],
),
(
select(
[table1],
or_(
table1.c.myid
== bindparam("myid", value=7, unique=True),
table2.c.otherid
== bindparam("myid", value=8, unique=True),
),
),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
"? OR myothertable.otherid = ?",
{"myid_1": 7, "myid_2": 8},
[7, 8],
{"myid_1": 5, "myid_2": 6},
{"myid_1": 5, "myid_2": 6},
[5, 6],
),
]:
self.assert_compile(
stmt, expected_named_stmt, params=expected_default_params_dict
)
self.assert_compile(
stmt, expected_positional_stmt, dialect=sqlite.dialect()
)
nonpositional = stmt.compile()
positional = stmt.compile(dialect=sqlite.dialect())
pp = positional.params
eq_(
[pp[k] for k in positional.positiontup],
expected_default_params_list,
)
eq_(
nonpositional.construct_params(test_param_dict),
expected_test_params_dict,
)
pp = positional.construct_params(test_param_dict)
eq_(
[pp[k] for k in positional.positiontup],
expected_test_params_list,
)
# check that params() doesn't modify original statement
s = select(
[table1],
or_(
table1.c.myid == bindparam("myid"),
table2.c.otherid == bindparam("myotherid"),
),
)
s2 = s.params({"myid": 8, "myotherid": 7})
s3 = s2.params({"myid": 9})
assert s.compile().params == {"myid": None, "myotherid": None}
assert s2.compile().params == {"myid": 8, "myotherid": 7}
assert s3.compile().params == {"myid": 9, "myotherid": 7}
# test using same 'unique' param object twice in one compile
s = (
select([table1.c.myid])
.where(table1.c.myid == 12)
.scalar_subquery()
)
s2 = select([table1, s], table1.c.myid == s)
self.assert_compile(
s2,
"SELECT mytable.myid, mytable.name, mytable.description, "
"(SELECT mytable.myid FROM mytable WHERE mytable.myid = "
":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = "
"(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
)
positional = s2.compile(dialect=sqlite.dialect())
pp = positional.params
assert [pp[k] for k in positional.positiontup] == [12, 12]
# check that conflicts with "unique" params are caught
s = select(
[table1],
or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")),
)
assert_raises_message(
exc.CompileError,
"conflicts with unique bind parameter " "of the same name",
str,
s,
)
s = select(
[table1],
or_(
table1.c.myid == 7,
table1.c.myid == 8,
table1.c.myid == bindparam("myid_1"),
),
)
assert_raises_message(
exc.CompileError,
"conflicts with unique bind parameter " "of the same name",
str,
s,
)
def _test_binds_no_hash_collision(self):
"""test that construct_params doesn't corrupt dict
due to hash collisions"""
total_params = 100000
in_clause = [":in%d" % i for i in range(total_params)]
params = dict(("in%d" % i, i) for i in range(total_params))
t = text("text clause %s" % ", ".join(in_clause))
eq_(len(t.bindparams), total_params)
c = t.compile()
pp = c.construct_params(params)
eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp)))
eq_(len(set(pp.values())), total_params)
def test_bind_as_col(self):
t = table("foo", column("id"))
s = select([t, literal("lala").label("hoho")])
self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
assert [str(c) for c in s.subquery().c] == ["anon_1.id", "anon_1.hoho"]
def test_bind_callable(self):
expr = column("x") == bindparam("key", callable_=lambda: 12)
self.assert_compile(expr, "x = :key", {"x": 12})
def test_bind_params_missing(self):
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x'",
select([table1])
.where(
and_(
table1.c.myid == bindparam("x", required=True),
table1.c.name == bindparam("y", required=True),
)
)
.compile()
.construct_params,
params=dict(y=5),
)
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x'",
select([table1])
.where(table1.c.myid == bindparam("x", required=True))
.compile()
.construct_params,
)
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x', "
"in parameter group 2",
select([table1])
.where(
and_(
table1.c.myid == bindparam("x", required=True),
table1.c.name == bindparam("y", required=True),
)
)
.compile()
.construct_params,
params=dict(y=5),
_group_number=2,
)
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x', "
"in parameter group 2",
select([table1])
.where(table1.c.myid == bindparam("x", required=True))
.compile()
.construct_params,
_group_number=2,
)
def test_tuple(self):
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
"(mytable.myid, mytable.name) IN "
"((:param_1, :param_2), (:param_3, :param_4))",
)
dialect = default.DefaultDialect()
dialect.tuple_in_values = True
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
"(mytable.myid, mytable.name) IN "
"(VALUES (:param_1, :param_2), (:param_3, :param_4))",
dialect=dialect,
)
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
[tuple_(table2.c.otherid, table2.c.othername)]
),
"(mytable.myid, mytable.name) IN "
"((myothertable.otherid, myothertable.othername))",
)
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
select([table2.c.otherid, table2.c.othername])
),
"(mytable.myid, mytable.name) IN (SELECT "
"myothertable.otherid, myothertable.othername FROM myothertable)",
)
def test_expanding_parameter(self):
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
bindparam("foo", expanding=True)
),
"(mytable.myid, mytable.name) IN ([POSTCOMPILE_foo])",
)
dialect = default.DefaultDialect()
dialect.tuple_in_values = True
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
bindparam("foo", expanding=True)
),
"(mytable.myid, mytable.name) IN ([POSTCOMPILE_foo])",
dialect=dialect,
)
self.assert_compile(
table1.c.myid.in_(bindparam("foo", expanding=True)),
"mytable.myid IN ([POSTCOMPILE_foo])",
)
def test_limit_offset_select_literal_binds(self):
stmt = select([1]).limit(5).offset(6)
self.assert_compile(
stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True
)
def test_limit_offset_compound_select_literal_binds(self):
stmt = select([1]).union(select([2])).limit(5).offset(6)
self.assert_compile(
stmt,
"SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6",
literal_binds=True,
)
def test_multiple_col_binds(self):
self.assert_compile(
select(
[literal_column("*")],
or_(
table1.c.myid == 12,
table1.c.myid == "asdf",
table1.c.myid == "foo",
),
),
"SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
"OR mytable.myid = :myid_2 OR mytable.myid = :myid_3",
)
def test_render_binds_as_literal(self):
"""test a compiler that renders binds inline into
SQL in the columns clause."""
dialect = default.DefaultDialect()
class Compiler(dialect.statement_compiler):
ansi_bind_rules = True
dialect.statement_compiler = Compiler
self.assert_compile(
select([literal("someliteral")]),
"SELECT 'someliteral' AS anon_1",
dialect=dialect,
)
self.assert_compile(
select([table1.c.myid + 3]),
"SELECT mytable.myid + 3 AS anon_1 FROM mytable",
dialect=dialect,
)
self.assert_compile(
select([table1.c.myid.in_([4, 5, 6])]),
"SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
dialect=dialect,
)
self.assert_compile(
select([func.mod(table1.c.myid, 5)]),
"SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
dialect=dialect,
)
self.assert_compile(
select([literal("foo").in_([])]),
"SELECT 1 != 1 AS anon_1",
dialect=dialect,
)
self.assert_compile(
select([literal(util.b("foo"))]),
"SELECT 'foo' AS anon_1",
dialect=dialect,
)
# test callable
self.assert_compile(
select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]),
"SELECT mytable.myid = 5 AS anon_1 FROM mytable",
dialect=dialect,
)
empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic")
empty_in_dialect.statement_compiler = Compiler
assert_raises_message(
exc.CompileError,
"Bind parameter 'foo' without a "
"renderable value not allowed here.",
bindparam("foo").in_([]).compile,
dialect=empty_in_dialect,
)
def test_render_literal_execute_parameter(self):
self.assert_compile(
select([table1.c.myid]).where(
table1.c.myid == bindparam("foo", 5, literal_execute=True)
),
"SELECT mytable.myid FROM mytable "
"WHERE mytable.myid = [POSTCOMPILE_foo]",
)
def test_render_literal_execute_parameter_literal_binds(self):
self.assert_compile(
select([table1.c.myid]).where(
table1.c.myid == bindparam("foo", 5, literal_execute=True)
),
"SELECT mytable.myid FROM mytable " "WHERE mytable.myid = 5",
literal_binds=True,
)
def test_render_expanding_parameter(self):
self.assert_compile(
select([table1.c.myid]).where(
table1.c.myid.in_(bindparam("foo", expanding=True))
),
"SELECT mytable.myid FROM mytable "
"WHERE mytable.myid IN ([POSTCOMPILE_foo])",
)
def test_render_expanding_parameter_literal_binds(self):
self.assert_compile(
select([table1.c.myid]).where(
table1.c.myid.in_(bindparam("foo", [1, 2, 3], expanding=True))
),
"SELECT mytable.myid FROM mytable "
"WHERE mytable.myid IN (1, 2, 3)",
literal_binds=True,
)
class UnsupportedTest(fixtures.TestBase):
def test_unsupported_element_str_visit_name(self):
from sqlalchemy.sql.expression import ClauseElement
+15
View File
@@ -1,3 +1,4 @@
from sqlalchemy import bindparam
from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import Integer
@@ -22,6 +23,7 @@ from sqlalchemy.sql.elements import _truncated_label
from sqlalchemy.sql.elements import Null
from sqlalchemy.sql.selectable import FromGrouping
from sqlalchemy.sql.selectable import SelectStatementGrouping
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
@@ -201,6 +203,19 @@ class RoleTest(fixtures.TestBase):
)
)
def test_offset_or_limit_role_only_ints_or_clauseelement(self):
assert_raises(ValueError, select([t]).limit, "some limit")
assert_raises(ValueError, select([t]).offset, "some offset")
def test_offset_or_limit_role_clauseelement(self):
bind = bindparam("x")
stmt = select([t]).limit(bind)
is_(stmt._limit_clause, bind)
stmt = select([t]).offset(bind)
is_(stmt._offset_clause, bind)
def test_from_clause_is_not_a_select(self):
assert_raises_message(
exc.ArgumentError,
+13
View File
@@ -282,6 +282,19 @@ class BindParamTest(fixtures.TestBase, AssertsCompiledSQL):
dialect="postgresql",
)
def test_unique_binds(self):
# unique binds can be used in text() however they uniquify across
# multiple text() constructs only, not within a single text
t1 = text("select :foo").bindparams(bindparam("foo", 5, unique=True))
t2 = text("select :foo").bindparams(bindparam("foo", 10, unique=True))
stmt = select([t1, t2])
self.assert_compile(
stmt,
"SELECT select :foo_1, select :foo_2",
checkparams={"foo_1": 5, "foo_2": 10},
)
def test_binds_compiled_positional(self):
self.assert_compile(
text(
+27
View File
@@ -62,6 +62,7 @@ from sqlalchemy.schema import AddConstraint
from sqlalchemy.schema import CheckConstraint
from sqlalchemy.sql import column
from sqlalchemy.sql import ddl
from sqlalchemy.sql import elements
from sqlalchemy.sql import null
from sqlalchemy.sql import operators
from sqlalchemy.sql import sqltypes
@@ -2363,6 +2364,20 @@ class ExpressionTest(
],
)
def test_grouped_bind_adapt(self):
expr = test_table.c.atimestamp == elements.Grouping(
bindparam("thedate")
)
eq_(expr.right.type._type_affinity, Date)
eq_(expr.right.element.type._type_affinity, Date)
expr = test_table.c.atimestamp == elements.Grouping(
elements.Grouping(bindparam("thedate"))
)
eq_(expr.right.type._type_affinity, Date)
eq_(expr.right.element.type._type_affinity, Date)
eq_(expr.right.element.element.type._type_affinity, Date)
def test_bind_adapt_update(self):
bp = bindparam("somevalue")
stmt = test_table.update().values(avalue=bp)
@@ -2883,6 +2898,18 @@ class IntervalTest(fixtures.TestBase, AssertsExecutionResults):
eq_(row["non_native_interval"], None)
class IntegerTest(fixtures.TestBase):
def test_integer_literal_processor(self):
typ = Integer()
eq_(typ._cached_literal_processor(testing.db.dialect)(5), "5")
assert_raises(
ValueError,
typ._cached_literal_processor(testing.db.dialect),
"notanint",
)
class BooleanTest(
fixtures.TablesTest, AssertsExecutionResults, AssertsCompiledSQL
):