Files
Loïc Simon 6b09777e3d PostgreSQL / SQLite / Insert.on_conflict_do_update: respect compile_kwargs
Fixed issue where :meth:`_postgresql.Insert.on_conflict_do_update`
as well as  :meth:`_sqlite.Insert.on_conflict_do_update`
parameters were not respecting compilation options such as
``literal_binds=True``.

Pull request courtesy Loïc Simon.

Fixes: #13110
Closes: #13111
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/13111
Pull-request-sha: 9ca251610b

Change-Id: Ice21e508210d682098104c78e77bad8d24e6c93f
2026-02-08 15:08:10 -05:00

5035 lines
168 KiB
Python

import contextlib
import random
import re
from sqlalchemy import all_
from sqlalchemy import and_
from sqlalchemy import any_
from sqlalchemy import BigInteger
from sqlalchemy import bindparam
from sqlalchemy import case
from sqlalchemy import cast
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
from sqlalchemy import Computed
from sqlalchemy import Date
from sqlalchemy import delete
from sqlalchemy import Enum
from sqlalchemy import exc
from sqlalchemy import Float
from sqlalchemy import ForeignKey
from sqlalchemy import ForeignKeyConstraint
from sqlalchemy import func
from sqlalchemy import Identity
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import MetaData
from sqlalchemy import null
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy import schema
from sqlalchemy import select
from sqlalchemy import Sequence
from sqlalchemy import SmallInteger
from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy import Text
from sqlalchemy import text
from sqlalchemy import true
from sqlalchemy import tuple_
from sqlalchemy import types as sqltypes
from sqlalchemy import UniqueConstraint
from sqlalchemy import update
from sqlalchemy import VARCHAR
from sqlalchemy.dialects import postgresql
from sqlalchemy.dialects.postgresql import aggregate_order_by
from sqlalchemy.dialects.postgresql import ARRAY as PG_ARRAY
from sqlalchemy.dialects.postgresql import array
from sqlalchemy.dialects.postgresql import array_agg as pg_array_agg
from sqlalchemy.dialects.postgresql import distinct_on
from sqlalchemy.dialects.postgresql import DOMAIN
from sqlalchemy.dialects.postgresql import ExcludeConstraint
from sqlalchemy.dialects.postgresql import HSTORE
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.postgresql import JSONPATH
from sqlalchemy.dialects.postgresql import Range
from sqlalchemy.dialects.postgresql import REGCONFIG
from sqlalchemy.dialects.postgresql import TSQUERY
from sqlalchemy.dialects.postgresql import TSRANGE
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2
from sqlalchemy.dialects.postgresql.ranges import MultiRange
from sqlalchemy.orm import aliased
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import Session
from sqlalchemy.sql import column
from sqlalchemy.sql import literal_column
from sqlalchemy.sql import operators
from sqlalchemy.sql import table
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql.functions import GenericFunction
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import assert_raises_message
from sqlalchemy.testing.assertions import AssertsCompiledSQL
from sqlalchemy.testing.assertions import eq_
from sqlalchemy.testing.assertions import eq_ignore_whitespace
from sqlalchemy.testing.assertions import expect_deprecated
from sqlalchemy.testing.assertions import expect_warnings
from sqlalchemy.testing.assertions import is_
from sqlalchemy.testing.util import resolve_lambda
from sqlalchemy.types import TypeEngine
from sqlalchemy.util import OrderedDict
class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "postgresql"
def test_format(self):
seq = Sequence("my_seq_no_schema")
dialect = postgresql.dialect()
assert (
dialect.identifier_preparer.format_sequence(seq)
== "my_seq_no_schema"
)
seq = Sequence("my_seq", schema="some_schema")
assert (
dialect.identifier_preparer.format_sequence(seq)
== "some_schema.my_seq"
)
seq = Sequence("My_Seq", schema="Some_Schema")
assert (
dialect.identifier_preparer.format_sequence(seq)
== '"Some_Schema"."My_Seq"'
)
@testing.combinations(
(None, ""),
(Integer, "AS INTEGER "),
(SmallInteger, "AS SMALLINT "),
(BigInteger, "AS BIGINT "),
)
def test_compile_type(self, type_, text):
s = Sequence("s1", data_type=type_)
self.assert_compile(
schema.CreateSequence(s),
f"CREATE SEQUENCE s1 {text}".strip(),
dialect=postgresql.dialect(),
)
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = postgresql.dialect()
def test_plain_stringify_returning(self):
t = Table(
"t",
MetaData(),
Column("myid", Integer, primary_key=True),
Column("name", String, server_default="some str"),
Column("description", String, default=func.lower("hi")),
)
stmt = t.insert().values().return_defaults()
eq_ignore_whitespace(
str(stmt.compile(dialect=postgresql.dialect())),
"INSERT INTO t (description) VALUES (lower(%(lower_1)s::VARCHAR)) "
"RETURNING t.myid, t.name, t.description",
)
def test_update_returning(self):
dialect = postgresql.dialect()
table1 = table(
"mytable",
column("myid", Integer),
column("name", String(128)),
column("description", String(128)),
)
u = (
update(table1)
.values(dict(name="foo"))
.returning(table1.c.myid, table1.c.name)
)
self.assert_compile(
u,
"UPDATE mytable SET name=%(name)s::VARCHAR RETURNING mytable.myid,"
" mytable.name",
dialect=dialect,
)
u = update(table1).values(dict(name="foo")).returning(table1)
self.assert_compile(
u,
"UPDATE mytable SET name=%(name)s::VARCHAR RETURNING mytable.myid,"
" mytable.name, mytable.description",
dialect=dialect,
)
u = (
update(table1)
.values(dict(name="foo"))
.returning(func.length(table1.c.name))
)
self.assert_compile(
u,
"UPDATE mytable SET name=%(name)s::VARCHAR RETURNING"
" length(mytable.name) AS length_1",
dialect=dialect,
)
def test_insert_returning(self):
dialect = postgresql.dialect()
table1 = table(
"mytable",
column("myid", Integer),
column("name", String(128)),
column("description", String(128)),
)
i = (
insert(table1)
.values(dict(name="foo"))
.returning(table1.c.myid, table1.c.name)
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) RETURNING"
" mytable.myid, mytable.name",
dialect=dialect,
)
i = insert(table1).values(dict(name="foo")).returning(table1)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) RETURNING"
" mytable.myid, mytable.name, mytable.description",
dialect=dialect,
)
i = (
insert(table1)
.values(dict(name="foo"))
.returning(func.length(table1.c.name))
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) RETURNING"
" length(mytable.name) AS length_1",
dialect=dialect,
)
@testing.fixture
def column_expression_fixture(self):
class MyString(TypeEngine):
def column_expression(self, column):
return func.lower(column)
return table(
"some_table", column("name", String), column("value", MyString)
)
@testing.combinations("columns", "table", argnames="use_columns")
def test_plain_returning_column_expression(
self, column_expression_fixture, use_columns
):
"""test #8770"""
table1 = column_expression_fixture
if use_columns == "columns":
stmt = insert(table1).returning(table1)
else:
stmt = insert(table1).returning(table1.c.name, table1.c.value)
self.assert_compile(
stmt,
"INSERT INTO some_table (name, value) VALUES (%(name)s::VARCHAR,"
" %(value)s) RETURNING some_table.name, lower(some_table.value) AS"
" value",
)
def test_create_drop_enum(self):
# test escaping and unicode within CREATE TYPE for ENUM
typ = postgresql.ENUM("val1", "val2", "val's 3", "méil", name="myname")
self.assert_compile(
postgresql.CreateEnumType(typ),
"CREATE TYPE myname AS ENUM ('val1', 'val2', 'val''s 3', 'méil')",
)
typ = postgresql.ENUM("val1", "val2", "val's 3", name="PleaseQuoteMe")
self.assert_compile(
postgresql.CreateEnumType(typ),
'CREATE TYPE "PleaseQuoteMe" AS ENUM '
"('val1', 'val2', 'val''s 3')",
)
def test_generic_enum(self):
e1 = Enum("x", "y", "z", name="somename")
e2 = Enum("x", "y", "z", name="somename", schema="someschema")
self.assert_compile(
postgresql.CreateEnumType(e1),
"CREATE TYPE somename AS ENUM ('x', 'y', 'z')",
)
self.assert_compile(
postgresql.CreateEnumType(e2),
"CREATE TYPE someschema.somename AS ENUM ('x', 'y', 'z')",
)
self.assert_compile(postgresql.DropEnumType(e1), "DROP TYPE somename")
self.assert_compile(
postgresql.DropEnumType(e2), "DROP TYPE someschema.somename"
)
t1 = Table("sometable", MetaData(), Column("somecolumn", e1))
self.assert_compile(
schema.CreateTable(t1),
"CREATE TABLE sometable (somecolumn somename)",
)
t1 = Table(
"sometable",
MetaData(),
Column(
"somecolumn",
Enum("x", "y", "z", native_enum=False, create_constraint=True),
),
)
self.assert_compile(
schema.CreateTable(t1),
"CREATE TABLE sometable (somecolumn "
"VARCHAR(1), CHECK (somecolumn IN ('x', "
"'y', 'z')))",
)
def test_cast_enum_schema(self):
"""test #6739"""
e1 = Enum("x", "y", "z", name="somename")
e2 = Enum("x", "y", "z", name="somename", schema="someschema")
stmt = select(cast(column("foo"), e1), cast(column("bar"), e2))
self.assert_compile(
stmt,
"SELECT CAST(foo AS somename) AS foo, "
"CAST(bar AS someschema.somename) AS bar",
)
def test_cast_double_pg_double(self):
"""test #5465:
test sqlalchemy Double/DOUBLE to PostgreSQL DOUBLE PRECISION
"""
d1 = sqltypes.Double
stmt = select(cast(column("foo"), d1))
self.assert_compile(
stmt, "SELECT CAST(foo AS DOUBLE PRECISION) AS foo"
)
def test_cast_enum_schema_translate(self):
"""test #6739"""
e1 = Enum("x", "y", "z", name="somename")
e2 = Enum("x", "y", "z", name="somename", schema="someschema")
schema_translate_map = {None: "bat", "someschema": "hoho"}
stmt = select(cast(column("foo"), e1), cast(column("bar"), e2))
self.assert_compile(
stmt,
"SELECT CAST(foo AS bat.somename) AS foo, "
"CAST(bar AS hoho.somename) AS bar",
schema_translate_map=schema_translate_map,
render_schema_translate=True,
)
def test_create_enum_schema_translate(self):
e1 = Enum("x", "y", "z", name="somename")
e2 = Enum("x", "y", "z", name="somename", schema="someschema")
schema_translate_map = {None: "foo", "someschema": "bar"}
self.assert_compile(
postgresql.CreateEnumType(e1),
"CREATE TYPE foo.somename AS ENUM ('x', 'y', 'z')",
schema_translate_map=schema_translate_map,
render_schema_translate=True,
)
self.assert_compile(
postgresql.CreateEnumType(e2),
"CREATE TYPE bar.somename AS ENUM ('x', 'y', 'z')",
schema_translate_map=schema_translate_map,
render_schema_translate=True,
)
def test_domain(self):
self.assert_compile(
postgresql.CreateDomainType(
DOMAIN(
"x",
Integer,
default=text("11"),
not_null=True,
check="VALUE < 0",
)
),
"CREATE DOMAIN x AS INTEGER DEFAULT 11 NOT NULL CHECK (VALUE < 0)",
)
self.assert_compile(
postgresql.CreateDomainType(
DOMAIN(
"sOmEnAmE",
Text,
collation="utf8",
constraint_name="a constraint",
not_null=True,
)
),
'CREATE DOMAIN "sOmEnAmE" AS TEXT COLLATE utf8 CONSTRAINT '
'"a constraint" NOT NULL',
)
self.assert_compile(
postgresql.CreateDomainType(
DOMAIN(
"foo",
Text,
collation="utf8",
default="foobar",
constraint_name="no_bar",
not_null=True,
check="VALUE != 'bar'",
)
),
"CREATE DOMAIN foo AS TEXT COLLATE utf8 DEFAULT 'foobar' "
"CONSTRAINT no_bar NOT NULL CHECK (VALUE != 'bar')",
)
def test_cast_domain_schema(self):
"""test #6739"""
d1 = DOMAIN("somename", Integer)
d2 = DOMAIN("somename", Integer, schema="someschema")
stmt = select(cast(column("foo"), d1), cast(column("bar"), d2))
self.assert_compile(
stmt,
"SELECT CAST(foo AS somename) AS foo, "
"CAST(bar AS someschema.somename) AS bar",
)
def test_create_domain_schema_translate(self):
d1 = DOMAIN("somename", Integer)
d2 = DOMAIN("somename", Integer, schema="someschema")
schema_translate_map = {None: "foo", "someschema": "bar"}
self.assert_compile(
postgresql.CreateDomainType(d1),
"CREATE DOMAIN foo.somename AS INTEGER ",
schema_translate_map=schema_translate_map,
render_schema_translate=True,
)
self.assert_compile(
postgresql.CreateDomainType(d2),
"CREATE DOMAIN bar.somename AS INTEGER ",
schema_translate_map=schema_translate_map,
render_schema_translate=True,
)
def test_create_table_with_schema_type_schema_translate(self):
e1 = Enum("x", "y", "z", name="somename")
e2 = Enum("x", "y", "z", name="somename", schema="someschema")
schema_translate_map = {None: "foo", "someschema": "bar"}
table = Table(
"some_table", MetaData(), Column("q", e1), Column("p", e2)
)
from sqlalchemy.schema import CreateTable
self.assert_compile(
CreateTable(table),
"CREATE TABLE foo.some_table (q foo.somename, p bar.somename)",
schema_translate_map=schema_translate_map,
render_schema_translate=True,
)
def test_create_table_array_embedded_schema_type_schema_translate(self):
"""test #6739"""
e1 = Enum("x", "y", "z", name="somename")
e2 = Enum("x", "y", "z", name="somename", schema="someschema")
schema_translate_map = {None: "foo", "someschema": "bar"}
table = Table(
"some_table",
MetaData(),
Column("q", PG_ARRAY(e1)),
Column("p", PG_ARRAY(e2)),
)
from sqlalchemy.schema import CreateTable
self.assert_compile(
CreateTable(table),
"CREATE TABLE foo.some_table (q foo.somename[], p bar.somename[])",
schema_translate_map=schema_translate_map,
render_schema_translate=True,
)
def test_create_table_with_tablespace(self):
m = MetaData()
tbl = Table(
"atable",
m,
Column("id", Integer),
postgresql_tablespace="sometablespace",
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) TABLESPACE sometablespace",
)
def test_create_table_with_tablespace_quoted(self):
# testing quoting of tablespace name
m = MetaData()
tbl = Table(
"anothertable",
m,
Column("id", Integer),
postgresql_tablespace="table",
)
self.assert_compile(
schema.CreateTable(tbl),
'CREATE TABLE anothertable (id INTEGER) TABLESPACE "table"',
)
def test_create_table_inherits(self):
m = MetaData()
tbl = Table(
"atable", m, Column("id", Integer), postgresql_inherits="i1"
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) INHERITS ( i1 )",
)
def test_create_table_inherits_tuple(self):
m = MetaData()
tbl = Table(
"atable",
m,
Column("id", Integer),
postgresql_inherits=("i1", "i2"),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) INHERITS ( i1, i2 )",
)
def test_create_table_inherits_quoting(self):
m = MetaData()
tbl = Table(
"atable",
m,
Column("id", Integer),
postgresql_inherits=("Quote Me", "quote Me Too"),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) INHERITS "
'( "Quote Me", "quote Me Too" )',
)
def test_create_table_partition_by_list(self):
m = MetaData()
tbl = Table(
"atable",
m,
Column("id", Integer),
Column("part_column", Integer),
postgresql_partition_by="LIST (part_column)",
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER, part_column INTEGER) "
"PARTITION BY LIST (part_column)",
)
def test_create_table_partition_by_range(self):
m = MetaData()
tbl = Table(
"atable",
m,
Column("id", Integer),
Column("part_column", Integer),
postgresql_partition_by="RANGE (part_column)",
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER, part_column INTEGER) "
"PARTITION BY RANGE (part_column)",
)
def test_create_table_with_oids(self):
m = MetaData()
tbl = Table(
"atable", m, Column("id", Integer), postgresql_with_oids=True
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) WITH OIDS",
)
tbl2 = Table(
"anothertable",
m,
Column("id", Integer),
postgresql_with_oids=False,
)
self.assert_compile(
schema.CreateTable(tbl2),
"CREATE TABLE anothertable (id INTEGER) WITHOUT OIDS",
)
def test_create_table_with_storage_parameters(self):
m = MetaData()
tbl = Table("atable1", m, postgresql_with={"fillfactor": 100})
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable1 () WITH (fillfactor = 100)",
)
tbl2 = Table(
"atable2",
m,
postgresql_with={"toast.autovacuum_insert_scale_factor": 1.25},
)
self.assert_compile(
schema.CreateTable(tbl2),
"CREATE TABLE atable2 () "
"WITH (toast.autovacuum_insert_scale_factor = 1.25)",
)
tbl3 = Table(
"atable3",
m,
postgresql_with={
"user_catalog_table": False,
"parallel_workers": 15,
},
)
self.assert_compile(
schema.CreateTable(tbl3),
"CREATE TABLE atable3 () "
"WITH (user_catalog_table = False, parallel_workers = 15)",
)
def test_create_table_with_oncommit_option(self):
m = MetaData()
tbl = Table(
"atable", m, Column("id", Integer), postgresql_on_commit="drop"
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) ON COMMIT DROP",
)
def test_create_table_with_using_option(self):
m = MetaData()
tbl = Table(
"atable",
m,
Column("id", Integer),
postgresql_using="heap",
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) USING heap",
)
def test_create_table_with_multiple_options(self):
m = MetaData()
tbl = Table(
"atable",
m,
Column("id", Integer),
postgresql_tablespace="sometablespace",
postgresql_with_oids=False,
postgresql_on_commit="preserve_rows",
postgresql_using="heap",
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE atable (id INTEGER) USING heap WITHOUT OIDS "
"ON COMMIT PRESERVE ROWS TABLESPACE sometablespace",
)
def test_create_partial_index(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", Integer))
idx = Index(
"test_idx1",
tbl.c.data,
postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10),
)
idx = Index(
"test_idx1",
tbl.c.data,
postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10),
)
# test quoting and all that
idx2 = Index(
"test_idx2",
tbl.c.data,
postgresql_where=and_(tbl.c.data > "a", tbl.c.data < "b's"),
)
self.assert_compile(
schema.CreateIndex(idx),
"CREATE INDEX test_idx1 ON testtbl (data) "
"WHERE data > 5 AND data < 10",
dialect=postgresql.dialect(),
)
self.assert_compile(
schema.CreateIndex(idx2),
"CREATE INDEX test_idx2 ON testtbl (data) "
"WHERE data > 'a' AND data < 'b''s'",
dialect=postgresql.dialect(),
)
idx3 = Index(
"test_idx2",
tbl.c.data,
postgresql_where=text("data > 'a' AND data < 'b''s'"),
)
self.assert_compile(
schema.CreateIndex(idx3),
"CREATE INDEX test_idx2 ON testtbl (data) "
"WHERE data > 'a' AND data < 'b''s'",
dialect=postgresql.dialect(),
)
def test_create_index_with_ops(self):
m = MetaData()
tbl = Table(
"testtbl",
m,
Column("data", String),
Column("data2", Integer, key="d2"),
)
idx = Index(
"test_idx1",
tbl.c.data,
postgresql_ops={"data": "text_pattern_ops"},
)
idx2 = Index(
"test_idx2",
tbl.c.data,
tbl.c.d2,
postgresql_ops={"data": "text_pattern_ops", "d2": "int4_ops"},
)
self.assert_compile(
schema.CreateIndex(idx),
"CREATE INDEX test_idx1 ON testtbl (data text_pattern_ops)",
dialect=postgresql.dialect(),
)
self.assert_compile(
schema.CreateIndex(idx2),
"CREATE INDEX test_idx2 ON testtbl "
"(data text_pattern_ops, data2 int4_ops)",
dialect=postgresql.dialect(),
)
@testing.combinations(
(
lambda tbl: schema.CreateIndex(
Index(
"test_idx1",
tbl.c.data,
unique=True,
postgresql_nulls_not_distinct=True,
)
),
(
"CREATE UNIQUE INDEX test_idx1 ON test_tbl "
"(data) NULLS NOT DISTINCT"
),
),
(
lambda tbl: schema.CreateIndex(
Index(
"test_idx2",
tbl.c.data2,
unique=True,
postgresql_nulls_not_distinct=False,
)
),
"CREATE UNIQUE INDEX test_idx2 ON test_tbl (data2) NULLS DISTINCT",
),
(
lambda tbl: schema.CreateIndex(
Index(
"test_idx3",
tbl.c.data3,
unique=True,
)
),
"CREATE UNIQUE INDEX test_idx3 ON test_tbl (data3)",
),
(
lambda tbl: schema.CreateIndex(
Index(
"test_idx3_complex",
tbl.c.data3,
postgresql_nulls_not_distinct=True,
postgresql_include=["data2"],
postgresql_where=and_(tbl.c.data3 > 5),
postgresql_with={"fillfactor": 50},
)
),
(
"CREATE INDEX test_idx3_complex ON test_tbl "
"(data3) INCLUDE (data2) NULLS NOT DISTINCT WITH "
"(fillfactor = 50) WHERE data3 > 5"
),
),
(
lambda tbl: schema.AddConstraint(
schema.UniqueConstraint(
tbl.c.data,
name="uq_data1",
postgresql_nulls_not_distinct=True,
),
),
(
"ALTER TABLE test_tbl ADD CONSTRAINT uq_data1 UNIQUE "
"NULLS NOT DISTINCT (data)"
),
),
(
lambda tbl: schema.AddConstraint(
schema.UniqueConstraint(
tbl.c.data2,
name="uq_data2",
postgresql_nulls_not_distinct=False,
),
),
(
"ALTER TABLE test_tbl ADD CONSTRAINT uq_data2 UNIQUE "
"NULLS DISTINCT (data2)"
),
),
(
lambda tbl: schema.AddConstraint(
schema.UniqueConstraint(tbl.c.data3, name="uq_data3"),
),
"ALTER TABLE test_tbl ADD CONSTRAINT uq_data3 UNIQUE (data3)",
),
)
def test_nulls_not_distinct(self, expr_fn, expected):
dd = PGDialect()
m = MetaData()
tbl = Table(
"test_tbl",
m,
Column("data", String),
Column("data2", Integer),
Column("data3", Integer),
)
expr = testing.resolve_lambda(expr_fn, tbl=tbl)
self.assert_compile(expr, expected, dialect=dd)
@testing.combinations(
(
lambda tbl: schema.AddConstraint(
UniqueConstraint(tbl.c.id, postgresql_include=[tbl.c.value]),
),
"ALTER TABLE foo ADD UNIQUE (id) INCLUDE (value)",
),
(
lambda tbl: schema.AddConstraint(
PrimaryKeyConstraint(
tbl.c.id, postgresql_include=[tbl.c.value, "misc"]
),
),
"ALTER TABLE foo ADD PRIMARY KEY (id) INCLUDE (value, misc)",
),
(
lambda tbl: schema.CreateIndex(
Index("idx", tbl.c.id, postgresql_include=[tbl.c.value])
),
"CREATE INDEX idx ON foo (id) INCLUDE (value)",
),
)
def test_include(self, expr_fn, expected):
m = MetaData()
tbl = Table(
"foo",
m,
Column("id", Integer, nullable=False),
Column("value", Integer, nullable=False),
Column("misc", String),
)
expr = testing.resolve_lambda(expr_fn, tbl=tbl)
self.assert_compile(expr, expected)
def test_create_index_with_labeled_ops(self):
m = MetaData()
tbl = Table(
"testtbl",
m,
Column("data", String),
Column("data2", Integer, key="d2"),
)
idx = Index(
"test_idx1",
func.lower(tbl.c.data).label("data_lower"),
postgresql_ops={"data_lower": "text_pattern_ops"},
)
idx2 = Index(
"test_idx2",
(func.xyz(tbl.c.data) + tbl.c.d2).label("bar"),
tbl.c.d2.label("foo"),
postgresql_ops={"bar": "text_pattern_ops", "foo": "int4_ops"},
)
self.assert_compile(
schema.CreateIndex(idx),
"CREATE INDEX test_idx1 ON testtbl (lower(data) text_pattern_ops)",
dialect=postgresql.dialect(),
)
self.assert_compile(
schema.CreateIndex(idx2),
"CREATE INDEX test_idx2 ON testtbl "
"((xyz(data) + data2) text_pattern_ops, "
"data2 int4_ops)",
dialect=postgresql.dialect(),
)
def test_create_index_with_text_or_composite(self):
m = MetaData()
tbl = Table("testtbl", m, Column("d1", String), Column("d2", Integer))
idx = Index("test_idx1", text("x"))
tbl.append_constraint(idx)
idx2 = Index("test_idx2", text("y"), tbl.c.d2)
idx3 = Index(
"test_idx2",
tbl.c.d1,
text("y"),
tbl.c.d2,
postgresql_ops={"d1": "x1", "d2": "x2"},
)
idx4 = Index(
"test_idx2",
tbl.c.d1,
tbl.c.d2 > 5,
text("q"),
postgresql_ops={"d1": "x1", "d2": "x2"},
)
idx5 = Index(
"test_idx2",
tbl.c.d1,
(tbl.c.d2 > 5).label("g"),
text("q"),
postgresql_ops={"d1": "x1", "g": "x2"},
)
self.assert_compile(
schema.CreateIndex(idx), "CREATE INDEX test_idx1 ON testtbl (x)"
)
self.assert_compile(
schema.CreateIndex(idx2),
"CREATE INDEX test_idx2 ON testtbl (y, d2)",
)
self.assert_compile(
schema.CreateIndex(idx3),
"CREATE INDEX test_idx2 ON testtbl (d1 x1, y, d2 x2)",
)
# note that at the moment we do not expect the 'd2' op to
# pick up on the "d2 > 5" expression
self.assert_compile(
schema.CreateIndex(idx4),
"CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5), q)",
)
# however it does work if we label!
self.assert_compile(
schema.CreateIndex(idx5),
"CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5) x2, q)",
)
def test_create_index_with_using(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", String))
idx1 = Index("test_idx1", tbl.c.data)
idx2 = Index("test_idx2", tbl.c.data, postgresql_using="btree")
idx3 = Index("test_idx3", tbl.c.data, postgresql_using="hash")
self.assert_compile(
schema.CreateIndex(idx1),
"CREATE INDEX test_idx1 ON testtbl (data)",
dialect=postgresql.dialect(),
)
self.assert_compile(
schema.CreateIndex(idx2),
"CREATE INDEX test_idx2 ON testtbl USING btree (data)",
dialect=postgresql.dialect(),
)
self.assert_compile(
schema.CreateIndex(idx3),
"CREATE INDEX test_idx3 ON testtbl USING hash (data)",
dialect=postgresql.dialect(),
)
def test_create_index_with_with(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", String))
idx1 = Index("test_idx1", tbl.c.data)
idx2 = Index(
"test_idx2", tbl.c.data, postgresql_with={"fillfactor": 50}
)
idx3 = Index(
"test_idx3",
tbl.c.data,
postgresql_using="gist",
postgresql_with={"buffering": "off"},
)
self.assert_compile(
schema.CreateIndex(idx1),
"CREATE INDEX test_idx1 ON testtbl (data)",
)
self.assert_compile(
schema.CreateIndex(idx2),
"CREATE INDEX test_idx2 ON testtbl (data) WITH (fillfactor = 50)",
)
self.assert_compile(
schema.CreateIndex(idx3),
"CREATE INDEX test_idx3 ON testtbl "
"USING gist (data) "
"WITH (buffering = off)",
)
def test_create_index_with_using_unusual_conditions(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", String))
self.assert_compile(
schema.CreateIndex(
Index("test_idx1", tbl.c.data, postgresql_using="GIST")
),
"CREATE INDEX test_idx1 ON testtbl USING gist (data)",
)
self.assert_compile(
schema.CreateIndex(
Index(
"test_idx1",
tbl.c.data,
postgresql_using="some_custom_method",
)
),
"CREATE INDEX test_idx1 ON testtbl "
"USING some_custom_method (data)",
)
assert_raises_message(
exc.CompileError,
"Unexpected SQL phrase: 'gin invalid sql'",
schema.CreateIndex(
Index(
"test_idx2", tbl.c.data, postgresql_using="gin invalid sql"
)
).compile,
dialect=postgresql.dialect(),
)
def test_create_index_with_tablespace(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", String))
idx1 = Index("test_idx1", tbl.c.data)
idx2 = Index(
"test_idx2", tbl.c.data, postgresql_tablespace="sometablespace"
)
idx3 = Index(
"test_idx3",
tbl.c.data,
postgresql_tablespace="another table space",
)
self.assert_compile(
schema.CreateIndex(idx1),
"CREATE INDEX test_idx1 ON testtbl (data)",
dialect=postgresql.dialect(),
)
self.assert_compile(
schema.CreateIndex(idx2),
"CREATE INDEX test_idx2 ON testtbl "
"(data) "
"TABLESPACE sometablespace",
dialect=postgresql.dialect(),
)
self.assert_compile(
schema.CreateIndex(idx3),
"CREATE INDEX test_idx3 ON testtbl "
"(data) "
'TABLESPACE "another table space"',
dialect=postgresql.dialect(),
)
def test_create_index_with_multiple_options(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", String))
idx1 = Index(
"test_idx1",
tbl.c.data,
postgresql_using="btree",
postgresql_tablespace="atablespace",
postgresql_with={"fillfactor": 60},
postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10),
)
self.assert_compile(
schema.CreateIndex(idx1),
"CREATE INDEX test_idx1 ON testtbl "
"USING btree (data) "
"WITH (fillfactor = 60) "
"TABLESPACE atablespace "
"WHERE data > 5 AND data < 10",
dialect=postgresql.dialect(),
)
def test_create_index_expr_gets_parens(self):
m = MetaData()
tbl = Table("testtbl", m, Column("x", Integer), Column("y", Integer))
idx1 = Index("test_idx1", 5 // (tbl.c.x + tbl.c.y))
self.assert_compile(
schema.CreateIndex(idx1),
"CREATE INDEX test_idx1 ON testtbl ((5 / (x + y)))",
)
def test_create_index_literals(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", Integer))
idx1 = Index("test_idx1", tbl.c.data + 5)
self.assert_compile(
schema.CreateIndex(idx1),
"CREATE INDEX test_idx1 ON testtbl ((data + 5))",
)
def test_create_index_concurrently(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", Integer))
idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True)
self.assert_compile(
schema.CreateIndex(idx1),
"CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)",
)
dialect_8_1 = postgresql.dialect()
dialect_8_1._supports_create_index_concurrently = False
self.assert_compile(
schema.CreateIndex(idx1),
"CREATE INDEX test_idx1 ON testtbl (data)",
dialect=dialect_8_1,
)
def test_drop_index_concurrently(self):
m = MetaData()
tbl = Table("testtbl", m, Column("data", Integer))
idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True)
self.assert_compile(
schema.DropIndex(idx1), "DROP INDEX CONCURRENTLY test_idx1"
)
dialect_9_1 = postgresql.dialect()
dialect_9_1._supports_drop_index_concurrently = False
self.assert_compile(
schema.DropIndex(idx1), "DROP INDEX test_idx1", dialect=dialect_9_1
)
def test_create_check_constraint_not_valid(self):
m = MetaData()
tbl = Table(
"testtbl",
m,
Column("data", Integer),
CheckConstraint("data = 0", postgresql_not_valid=True),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE testtbl (data INTEGER, CHECK (data = 0) NOT VALID)",
)
def test_create_foreign_key_constraint_not_valid(self):
m = MetaData()
tbl = Table(
"testtbl",
m,
Column("a", Integer),
Column("b", Integer),
ForeignKeyConstraint(
"b", ["testtbl.a"], postgresql_not_valid=True
),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE testtbl ("
"a INTEGER, "
"b INTEGER, "
"FOREIGN KEY(b) REFERENCES testtbl (a) NOT VALID"
")",
)
def test_create_foreign_key_column_not_valid(self):
m = MetaData()
tbl = Table(
"testtbl",
m,
Column("a", Integer),
Column("b", ForeignKey("testtbl.a", postgresql_not_valid=True)),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE testtbl ("
"a INTEGER, "
"b INTEGER, "
"FOREIGN KEY(b) REFERENCES testtbl (a) NOT VALID"
")",
)
def test_create_foreign_key_constraint_ondelete_column_list(self):
m = MetaData()
pktable = Table(
"pktable",
m,
Column("tid", Integer, primary_key=True),
Column("id", Integer, primary_key=True),
)
fktable = Table(
"fktable",
m,
Column("tid", Integer),
Column("id", Integer),
Column("fk_id_del_set_null", Integer),
Column("fk_id_del_set_default", Integer, server_default=text("0")),
ForeignKeyConstraint(
columns=["tid", "fk_id_del_set_null"],
refcolumns=[pktable.c.tid, pktable.c.id],
ondelete="SET NULL (fk_id_del_set_null)",
),
ForeignKeyConstraint(
columns=["tid", "fk_id_del_set_default"],
refcolumns=[pktable.c.tid, pktable.c.id],
ondelete="SET DEFAULT(fk_id_del_set_default)",
),
)
self.assert_compile(
schema.CreateTable(fktable),
"CREATE TABLE fktable ("
"tid INTEGER, id INTEGER, "
"fk_id_del_set_null INTEGER, "
"fk_id_del_set_default INTEGER DEFAULT 0, "
"FOREIGN KEY(tid, fk_id_del_set_null)"
" REFERENCES pktable (tid, id)"
" ON DELETE SET NULL (fk_id_del_set_null), "
"FOREIGN KEY(tid, fk_id_del_set_default)"
" REFERENCES pktable (tid, id)"
" ON DELETE SET DEFAULT(fk_id_del_set_default)"
")",
)
def test_exclude_constraint_min(self):
m = MetaData()
tbl = Table("testtbl", m, Column("room", Integer, primary_key=True))
cons = ExcludeConstraint(("room", "="))
tbl.append_constraint(cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD EXCLUDE USING gist (room WITH =)",
dialect=postgresql.dialect(),
)
@testing.combinations(
(True, "deferred"),
(False, "immediate"),
argnames="deferrable_value, initially_value",
)
def test_copy_exclude_constraint_adhoc_columns(
self, deferrable_value, initially_value
):
meta = MetaData()
table = Table(
"mytable",
meta,
Column("myid", Integer, Sequence("foo_id_seq"), primary_key=True),
Column("valid_from_date", Date(), nullable=True),
Column("valid_thru_date", Date(), nullable=True),
)
sql_text = "daterange(valid_from_date, valid_thru_date, '[]')"
cons = ExcludeConstraint(
(literal_column(sql_text), "&&"),
where=column("valid_from_date") <= column("valid_thru_date"),
name="ex_mytable_valid_date_range",
deferrable=deferrable_value,
initially=initially_value,
)
table.append_constraint(cons)
eq_(cons.columns.keys(), [sql_text])
expected = (
"ALTER TABLE mytable ADD CONSTRAINT ex_mytable_valid_date_range "
"EXCLUDE USING gist "
"(daterange(valid_from_date, valid_thru_date, '[]') WITH &&) "
"WHERE (valid_from_date <= valid_thru_date) "
"%s %s"
% (
"NOT DEFERRABLE" if not deferrable_value else "DEFERRABLE",
"INITIALLY %s" % initially_value,
)
)
self.assert_compile(
schema.AddConstraint(cons),
expected,
dialect=postgresql.dialect(),
)
meta2 = MetaData()
table2 = table.to_metadata(meta2)
cons2 = [
c for c in table2.constraints if isinstance(c, ExcludeConstraint)
][0]
self.assert_compile(
schema.AddConstraint(cons2),
expected,
dialect=postgresql.dialect(),
)
def test_exclude_constraint_full(self):
m = MetaData()
room = Column("room", Integer, primary_key=True)
tbl = Table("testtbl", m, room, Column("during", TSRANGE))
room = Column("room", Integer, primary_key=True)
cons = ExcludeConstraint(
(room, "="),
("during", "&&"),
name="my_name",
using="gist",
where="room > 100",
deferrable=True,
initially="immediate",
ops={"room": "my_opclass"},
)
tbl.append_constraint(cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD CONSTRAINT my_name "
"EXCLUDE USING gist "
"(room my_opclass WITH =, during WITH "
"&&) WHERE "
"(room > 100) DEFERRABLE INITIALLY immediate",
dialect=postgresql.dialect(),
)
def test_exclude_constraint_copy(self):
m = MetaData()
cons = ExcludeConstraint(("room", "="))
tbl = Table(
"testtbl", m, Column("room", Integer, primary_key=True), cons
)
# apparently you can't copy a ColumnCollectionConstraint until
# after it has been bound to a table...
cons_copy = cons._copy()
tbl.append_constraint(cons_copy)
self.assert_compile(
schema.AddConstraint(cons_copy),
"ALTER TABLE testtbl ADD EXCLUDE USING gist (room WITH =)",
)
def test_exclude_constraint_copy_complex(self):
m = MetaData()
tbl = Table("foo", m, Column("x", Integer), Column("y", Integer))
cons = ExcludeConstraint(
(tbl.c.x, "*"),
(text("x-y"), "%"),
(literal_column("x+y"), "$"),
(tbl.c.x // tbl.c.y, "??"),
(func.power(tbl.c.x, 42), "="),
(func.int8range(column("x"), column("y")), "&&"),
("y", "^"),
)
tbl.append_constraint(cons)
expected = (
"ALTER TABLE {name} ADD EXCLUDE USING gist "
"(x WITH *, x-y WITH %, x+y WITH $, x / y WITH ??, "
"power(x, 42) WITH =, int8range(x, y) WITH &&, y WITH ^)"
)
self.assert_compile(
schema.AddConstraint(cons),
expected.format(name="foo"),
dialect=postgresql.dialect(),
)
m2 = MetaData()
tbl2 = tbl.to_metadata(m2, name="bar")
(cons2,) = [
c for c in tbl2.constraints if isinstance(c, ExcludeConstraint)
]
self.assert_compile(
schema.AddConstraint(cons2),
expected.format(name="bar"),
dialect=postgresql.dialect(),
)
def test_exclude_constraint_copy_where_using(self):
m = MetaData()
tbl = Table("testtbl", m, Column("room", Integer, primary_key=True))
cons = ExcludeConstraint(
(tbl.c.room, "="), where=tbl.c.room > 5, using="foobar"
)
tbl.append_constraint(cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD EXCLUDE USING foobar "
"(room WITH =) WHERE (testtbl.room > 5)",
)
m2 = MetaData()
tbl2 = tbl.to_metadata(m2)
self.assert_compile(
schema.CreateTable(tbl2),
"CREATE TABLE testtbl (room SERIAL NOT NULL, "
"PRIMARY KEY (room), "
"EXCLUDE USING foobar "
"(room WITH =) WHERE (testtbl.room > 5))",
)
def test_exclude_constraint_text(self):
m = MetaData()
cons = ExcludeConstraint((text("room::TEXT"), "="))
Table("testtbl", m, Column("room", String), cons)
eq_(list(cons.columns), [])
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD EXCLUDE USING gist (room::TEXT WITH =)",
)
def test_exclude_constraint_colname_needs_quoting(self):
m = MetaData()
cons = ExcludeConstraint(("Some Column Name", "="))
Table("testtbl", m, Column("Some Column Name", String), cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD EXCLUDE USING gist "
'("Some Column Name" WITH =)',
)
def test_exclude_constraint_with_using_unusual_conditions(self):
m = MetaData()
cons = ExcludeConstraint(("q", "="), using="not a keyword")
Table("testtbl", m, Column("q", String), cons)
assert_raises_message(
exc.CompileError,
"Unexpected SQL phrase: 'not a keyword'",
schema.AddConstraint(cons).compile,
dialect=postgresql.dialect(),
)
def test_exclude_constraint_cast(self):
m = MetaData()
tbl = Table("testtbl", m, Column("room", String))
cons = ExcludeConstraint((cast(tbl.c.room, Text), "="))
tbl.append_constraint(cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD EXCLUDE USING gist "
"(CAST(room AS TEXT) WITH =)",
)
def test_exclude_constraint_cast_quote(self):
m = MetaData()
tbl = Table("testtbl", m, Column("Room", String))
cons = ExcludeConstraint((cast(tbl.c.Room, Text), "="))
tbl.append_constraint(cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD EXCLUDE USING gist "
'(CAST("Room" AS TEXT) WITH =)',
)
def test_exclude_constraint_when(self):
m = MetaData()
tbl = Table("testtbl", m, Column("room", String))
cons = ExcludeConstraint(("room", "="), where=tbl.c.room.in_(["12"]))
tbl.append_constraint(cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD EXCLUDE USING gist "
"(room WITH =) WHERE (testtbl.room IN ('12'))",
dialect=postgresql.dialect(),
)
def test_exclude_constraint_ops_many(self):
m = MetaData()
tbl = Table(
"testtbl", m, Column("room", String), Column("during", TSRANGE)
)
cons = ExcludeConstraint(
("room", "="),
("during", "&&"),
ops={"room": "first_opsclass", "during": "second_opclass"},
)
tbl.append_constraint(cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD EXCLUDE USING gist "
"(room first_opsclass WITH =, during second_opclass WITH &&)",
dialect=postgresql.dialect(),
)
def test_exclude_constraint_expression(self):
m = MetaData()
tbl = Table("foo", m, Column("x", Integer), Column("y", Integer))
cons = ExcludeConstraint((func.int8range(column("x"), tbl.c.y), "&&"))
tbl.append_constraint(cons)
# only the first col is considered. see #9233
eq_(cons.columns.keys(), ["x"])
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE foo ADD EXCLUDE USING gist (int8range(x, y) WITH &&)",
dialect=postgresql.dialect(),
)
def test_exclude_constraint_literal_binds(self):
m = MetaData()
tbl = Table("foo", m, Column("x", Integer), Column("y", Integer))
cons = ExcludeConstraint(
(func.power(tbl.c.x, 42), "="),
(func.int8range(column("x"), "y"), "&&"),
)
tbl.append_constraint(cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE foo ADD EXCLUDE USING gist "
"(power(x, 42) WITH =, int8range(x, 'y') WITH &&)",
dialect=postgresql.dialect(),
)
def test_substring(self):
self.assert_compile(
func.substring("abc", 1, 2),
"SUBSTRING(%(substring_1)s::VARCHAR FROM %(substring_2)s::INTEGER"
" FOR %(substring_3)s::INTEGER)",
)
self.assert_compile(
func.substring("abc", 1),
"SUBSTRING(%(substring_1)s::VARCHAR FROM"
" %(substring_2)s::INTEGER)",
)
def test_for_update(self):
table1 = table(
"mytable", column("myid"), column("name"), column("description")
)
self.assert_compile(
table1.select().where(table1.c.myid == 7).with_for_update(),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR UPDATE",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR UPDATE"
" NOWAIT",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR UPDATE SKIP"
" LOCKED",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(read=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR SHARE",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(read=True, nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR SHARE"
" NOWAIT",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(key_share=True, nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR NO KEY"
" UPDATE NOWAIT",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(key_share=True, read=True, nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR KEY SHARE"
" NOWAIT",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(read=True, skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR SHARE SKIP"
" LOCKED",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(of=table1.c.myid),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR UPDATE OF"
" mytable",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(read=True, nowait=True, of=table1),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR SHARE OF"
" mytable NOWAIT",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(
key_share=True, read=True, nowait=True, of=table1
),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR KEY SHARE"
" OF mytable NOWAIT",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(read=True, nowait=True, of=table1.c.myid),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR SHARE OF"
" mytable NOWAIT",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(
read=True, nowait=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR SHARE OF"
" mytable NOWAIT",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(
read=True,
skip_locked=True,
of=[table1.c.myid, table1.c.name],
key_share=True,
),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR KEY SHARE"
" OF mytable SKIP LOCKED",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(
skip_locked=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR UPDATE OF"
" mytable SKIP LOCKED",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(
read=True, skip_locked=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR SHARE OF"
" mytable SKIP LOCKED",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(
key_share=True, nowait=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR NO KEY"
" UPDATE OF mytable NOWAIT",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(
key_share=True,
skip_locked=True,
of=[table1.c.myid, table1.c.name],
),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR NO KEY"
" UPDATE OF mytable SKIP LOCKED",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(
key_share=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR NO KEY"
" UPDATE OF mytable",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(key_share=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR NO KEY"
" UPDATE",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(read=True, key_share=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR KEY SHARE",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(read=True, key_share=True, of=table1),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR KEY SHARE"
" OF mytable",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(read=True, of=table1),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR SHARE OF"
" mytable",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(read=True, key_share=True, skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR KEY SHARE"
" SKIP LOCKED",
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(key_share=True, skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR NO KEY"
" UPDATE SKIP LOCKED",
)
ta = table1.alias()
self.assert_compile(
ta.select()
.where(ta.c.myid == 7)
.with_for_update(of=[ta.c.myid, ta.c.name]),
"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM"
" mytable AS mytable_1 WHERE mytable_1.myid = %(myid_1)s::INTEGER"
" FOR UPDATE OF mytable_1",
)
table2 = table("table2", column("mytable_id"))
join = table2.join(table1, table2.c.mytable_id == table1.c.myid)
self.assert_compile(
join.select()
.where(table2.c.mytable_id == 7)
.with_for_update(of=[join]),
"SELECT table2.mytable_id, mytable.myid, mytable.name,"
" mytable.description FROM table2 JOIN mytable ON"
" table2.mytable_id = mytable.myid WHERE table2.mytable_id ="
" %(mytable_id_1)s::INTEGER FOR UPDATE OF mytable, table2",
)
join = table2.join(ta, table2.c.mytable_id == ta.c.myid)
self.assert_compile(
join.select()
.where(table2.c.mytable_id == 7)
.with_for_update(of=[join]),
"SELECT table2.mytable_id, mytable_1.myid, mytable_1.name,"
" mytable_1.description FROM table2 JOIN mytable AS mytable_1 ON"
" table2.mytable_id = mytable_1.myid WHERE table2.mytable_id ="
" %(mytable_id_1)s::INTEGER FOR UPDATE OF mytable_1, table2",
)
# ensure of=text() for of works
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(of=text("table1")),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR UPDATE OF"
" table1",
)
# ensure literal_column of works
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(of=literal_column("table1")),
"SELECT mytable.myid, mytable.name, mytable.description FROM"
" mytable WHERE mytable.myid = %(myid_1)s::INTEGER FOR UPDATE OF"
" table1",
)
# test issue #12417
subquery = select(table1.c.myid).with_for_update(of=table1).lateral()
statement = select(subquery.c.myid)
self.assert_compile(
statement,
"SELECT anon_1.myid FROM LATERAL (SELECT mytable.myid AS myid "
"FROM mytable FOR UPDATE OF mytable) AS anon_1",
)
def test_for_update_with_schema(self):
m = MetaData()
table1 = Table(
"mytable", m, Column("myid"), Column("name"), schema="testschema"
)
self.assert_compile(
table1.select()
.where(table1.c.myid == 7)
.with_for_update(of=table1),
"SELECT testschema.mytable.myid, testschema.mytable.name FROM"
" testschema.mytable WHERE testschema.mytable.myid ="
" %(myid_1)s::INTEGER FOR UPDATE OF mytable",
)
def test_reserved_words(self):
table = Table(
"pg_table",
MetaData(),
Column("col1", Integer),
Column("variadic", Integer),
)
x = select(table.c.col1, table.c.variadic)
self.assert_compile(
x, """SELECT pg_table.col1, pg_table."variadic" FROM pg_table"""
)
def _array_any_deprecation(self):
return testing.expect_deprecated(
r"The ARRAY.Comparator.any\(\) and "
r"ARRAY.Comparator.all\(\) methods "
r"for arrays are deprecated for removal, along with the "
r"PG-specific Any\(\) "
r"and All\(\) functions. See any_\(\) and all_\(\) functions for "
"modern use. "
)
def test_array(self):
c = Column("x", postgresql.ARRAY(Integer))
self.assert_compile(
cast(c, postgresql.ARRAY(Integer)), "CAST(x AS INTEGER[])"
)
self.assert_compile(
c[5], "x[%(x_1)s::INTEGER]", checkparams={"x_1": 5}
)
self.assert_compile(
c[5:7],
"x[%(x_1)s::INTEGER:%(x_2)s::INTEGER]",
checkparams={"x_2": 7, "x_1": 5},
)
self.assert_compile(
c[5:7][2:3],
"x[%(x_1)s::INTEGER:%(x_2)s::INTEGER]"
"[%(param_1)s::INTEGER:%(param_2)s::INTEGER]",
checkparams={"x_2": 7, "x_1": 5, "param_1": 2, "param_2": 3},
)
self.assert_compile(
c[5:7][3],
"x[%(x_1)s::INTEGER:%(x_2)s::INTEGER][%(param_1)s::INTEGER]",
checkparams={"x_2": 7, "x_1": 5, "param_1": 3},
)
self.assert_compile(
c.contains([1]),
"x @> %(x_1)s::INTEGER[]",
checkparams={"x_1": [1]},
dialect=PGDialect_psycopg2(),
)
self.assert_compile(
c.contained_by([2]),
"x <@ %(x_1)s::INTEGER[]",
checkparams={"x_1": [2]},
dialect=PGDialect_psycopg2(),
)
self.assert_compile(
c.contained_by([2]),
"x <@ %(x_1)s",
checkparams={"x_1": [2]},
dialect=PGDialect(),
)
self.assert_compile(
c.overlap([3]),
"x && %(x_1)s::INTEGER[]",
checkparams={"x_1": [3]},
dialect=PGDialect_psycopg2(),
)
def test_array_modern_any_all(self):
c = Column("x", postgresql.ARRAY(Integer))
self.assert_compile(
4 == c.any_(),
"%(param_1)s::INTEGER = ANY (x)",
checkparams={"param_1": 4},
)
self.assert_compile(
5 == any_(c),
"%(param_1)s::INTEGER = ANY (x)",
checkparams={"param_1": 5},
)
self.assert_compile(
~(c.any_() == 5),
"NOT (%(param_1)s::INTEGER = ANY (x))",
checkparams={"param_1": 5},
)
self.assert_compile(
~(5 == c.any_()),
"NOT (%(param_1)s::INTEGER = ANY (x))",
checkparams={"param_1": 5},
)
self.assert_compile(
5 != any_(c),
"%(param_1)s::INTEGER != ANY (x)",
checkparams={"param_1": 5},
)
self.assert_compile(
6 > all_(c),
"%(param_1)s::INTEGER > ALL (x)",
checkparams={"param_1": 6},
)
self.assert_compile(
7 < all_(c),
"%(param_1)s::INTEGER < ALL (x)",
checkparams={"param_1": 7},
)
self.assert_compile(
c.all_() == 5,
"%(param_1)s::INTEGER = ALL (x)",
checkparams={"param_1": 5},
)
self.assert_compile(
5 == c.all_(),
"%(param_1)s::INTEGER = ALL (x)",
checkparams={"param_1": 5},
)
self.assert_compile(
~(5 == all_(c)),
"NOT (%(param_1)s::INTEGER = ALL (x))",
checkparams={"param_1": 5},
)
self.assert_compile(
~(all_(c) == 5),
"NOT (%(param_1)s::INTEGER = ALL (x))",
checkparams={"param_1": 5},
)
def test_array_deprecated_any_all(self):
c = Column("x", postgresql.ARRAY(Integer))
with self._array_any_deprecation():
self.assert_compile(
postgresql.Any(4, c),
"%(x_1)s::INTEGER = ANY (x)",
checkparams={"x_1": 4},
)
with self._array_any_deprecation():
self.assert_compile(
c.any(5), "%(x_1)s::INTEGER = ANY (x)", checkparams={"x_1": 5}
)
with self._array_any_deprecation():
self.assert_compile(
~c.any(5),
"NOT (%(x_1)s::INTEGER = ANY (x))",
checkparams={"x_1": 5},
)
with self._array_any_deprecation():
self.assert_compile(
c.any(5, operator=operators.ne),
"%(x_1)s::INTEGER != ANY (x)",
checkparams={"x_1": 5},
)
with self._array_any_deprecation():
self.assert_compile(
postgresql.All(6, c, operator=operators.gt),
"%(x_1)s::INTEGER > ALL (x)",
checkparams={"x_1": 6},
)
with self._array_any_deprecation():
self.assert_compile(
c.all(7, operator=operators.lt),
"%(x_1)s::INTEGER < ALL (x)",
checkparams={"x_1": 7},
)
with self._array_any_deprecation():
self.assert_compile(
c.all(5), "%(x_1)s::INTEGER = ALL (x)", checkparams={"x_1": 5}
)
with self._array_any_deprecation():
self.assert_compile(
~c.all(5),
"NOT (%(x_1)s::INTEGER = ALL (x))",
checkparams={"x_1": 5},
)
@testing.combinations(
(lambda c: c.overlap, "&&"),
(lambda c: c.contains, "@>"),
(lambda c: c.contained_by, "<@"),
)
def test_overlap_no_cartesian(self, op_fn, expected_op):
"""test #6886"""
t1 = table(
"t1",
column("id", Integer),
column("ancestor_ids", postgresql.ARRAY(Integer)),
)
t1a = t1.alias()
t1b = t1.alias()
stmt = (
select(t1, t1a, t1b)
.where(op_fn(t1a.c.ancestor_ids)(postgresql.array((t1.c.id,))))
.where(op_fn(t1b.c.ancestor_ids)(postgresql.array((t1.c.id,))))
)
self.assert_compile(
stmt,
"SELECT t1.id, t1.ancestor_ids, t1_1.id AS id_1, "
"t1_1.ancestor_ids AS ancestor_ids_1, t1_2.id AS id_2, "
"t1_2.ancestor_ids AS ancestor_ids_2 "
"FROM t1, t1 AS t1_1, t1 AS t1_2 "
"WHERE t1_1.ancestor_ids %(op)s ARRAY[t1.id] "
"AND t1_2.ancestor_ids %(op)s ARRAY[t1.id]" % {"op": expected_op},
from_linting=True,
)
@testing.combinations((True,), (False,))
def test_array_zero_indexes(self, zero_indexes):
c = Column("x", postgresql.ARRAY(Integer, zero_indexes=zero_indexes))
add_one = 1 if zero_indexes else 0
self.assert_compile(
cast(c, postgresql.ARRAY(Integer, zero_indexes=zero_indexes)),
"CAST(x AS INTEGER[])",
)
self.assert_compile(
c[5], "x[%(x_1)s::INTEGER]", checkparams={"x_1": 5 + add_one}
)
self.assert_compile(
c[5:7],
"x[%(x_1)s::INTEGER:%(x_2)s::INTEGER]",
checkparams={"x_2": 7 + add_one, "x_1": 5 + add_one},
)
self.assert_compile(
c[5:7][2:3],
"x[%(x_1)s::INTEGER:%(x_2)s::INTEGER]"
"[%(param_1)s::INTEGER:%(param_2)s::INTEGER]",
checkparams={
"x_2": 7 + add_one,
"x_1": 5 + add_one,
"param_1": 2 + add_one,
"param_2": 3 + add_one,
},
)
self.assert_compile(
c[5:7][3],
"x[%(x_1)s::INTEGER:%(x_2)s::INTEGER][%(param_1)s::INTEGER]",
checkparams={
"x_2": 7 + add_one,
"x_1": 5 + add_one,
"param_1": 3 + add_one,
},
)
def test_array_literal_type(self):
isinstance(postgresql.array([1, 2]).type, postgresql.ARRAY)
is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer)
is_(
postgresql.array(
[1, 2], type_=String
).type.item_type._type_affinity,
String,
)
@testing.combinations(
("with type_", Date, "ARRAY[]::DATE[]"),
("no type_", None, "ARRAY[]"),
id_="iaa",
)
def test_array_literal_empty(self, type_, expected):
self.assert_compile(postgresql.array([], type_=type_), expected)
def test_array_literal(self):
self.assert_compile(
func.array_dims(
postgresql.array([1, 2]) + postgresql.array([3, 4, 5])
),
"array_dims(ARRAY[%(param_1)s::INTEGER, %(param_2)s::INTEGER] ||"
" ARRAY[%(param_3)s::INTEGER, %(param_4)s::INTEGER,"
" %(param_5)s::INTEGER])",
checkparams={
"param_5": 5,
"param_4": 4,
"param_1": 1,
"param_3": 3,
"param_2": 2,
},
)
def test_array_literal_compare(self):
self.assert_compile(
postgresql.array([1, 2]) == [3, 4, 5],
"ARRAY[%(param_1)s::INTEGER, %(param_2)s::INTEGER] ="
" ARRAY[%(param_3)s::INTEGER, %(param_4)s::INTEGER,"
" %(param_5)s::INTEGER]",
checkparams={
"param_5": 5,
"param_4": 4,
"param_1": 1,
"param_3": 3,
"param_2": 2,
},
)
def test_array_literal_contains(self):
self.assert_compile(
postgresql.array([1, 2]).contains([3, 4, 5]),
"ARRAY[%(param_1)s::INTEGER, %(param_2)s::INTEGER] @>"
" ARRAY[%(param_3)s::INTEGER, %(param_4)s::INTEGER,"
" %(param_5)s::INTEGER]",
checkparams={
"param_1": 1,
"param_2": 2,
"param_3": 3,
"param_4": 4,
"param_5": 5,
},
)
self.assert_compile(
postgresql.array(["a", "b"]).contains([""]),
"ARRAY[%(param_1)s::VARCHAR, %(param_2)s::VARCHAR] @>"
" ARRAY[%(param_3)s::VARCHAR]",
checkparams={"param_1": "a", "param_2": "b", "param_3": ""},
)
self.assert_compile(
postgresql.array(["a", "b"]).contains([]),
"ARRAY[%(param_1)s::VARCHAR, %(param_2)s::VARCHAR] @> ARRAY[]",
checkparams={"param_1": "a", "param_2": "b"},
)
self.assert_compile(
postgresql.array(["a", "b"]).contains([0]),
"ARRAY[%(param_1)s::VARCHAR, %(param_2)s::VARCHAR] @>"
" ARRAY[%(param_3)s::INTEGER]",
checkparams={"param_1": "a", "param_2": "b", "param_3": 0},
)
def test_array_literal_contained_by(self):
self.assert_compile(
postgresql.array(["a", "b"]).contained_by(["a", "b", "c"]),
"ARRAY[%(param_1)s::VARCHAR, %(param_2)s::VARCHAR] <@"
" ARRAY[%(param_3)s::VARCHAR, %(param_4)s::VARCHAR,"
" %(param_5)s::VARCHAR]",
checkparams={
"param_1": "a",
"param_2": "b",
"param_3": "a",
"param_4": "b",
"param_5": "c",
},
)
self.assert_compile(
postgresql.array([1, 2]).contained_by([3, 4, 5]),
"ARRAY[%(param_1)s::INTEGER, %(param_2)s::INTEGER] <@"
" ARRAY[%(param_3)s::INTEGER, %(param_4)s::INTEGER,"
" %(param_5)s::INTEGER]",
checkparams={
"param_1": 1,
"param_2": 2,
"param_3": 3,
"param_4": 4,
"param_5": 5,
},
)
self.assert_compile(
postgresql.array(["a", "b"]).contained_by([""]),
"ARRAY[%(param_1)s::VARCHAR, %(param_2)s::VARCHAR] <@"
" ARRAY[%(param_3)s::VARCHAR]",
checkparams={"param_1": "a", "param_2": "b", "param_3": ""},
)
self.assert_compile(
postgresql.array(["a", "b"]).contained_by([]),
"ARRAY[%(param_1)s::VARCHAR, %(param_2)s::VARCHAR] <@ ARRAY[]",
checkparams={"param_1": "a", "param_2": "b"},
)
self.assert_compile(
postgresql.array(["a", "b"]).contained_by([0]),
"ARRAY[%(param_1)s::VARCHAR, %(param_2)s::VARCHAR] <@"
" ARRAY[%(param_3)s::INTEGER]",
checkparams={"param_1": "a", "param_2": "b", "param_3": 0},
)
def test_array_literal_insert(self):
m = MetaData()
t = Table("t", m, Column("data", postgresql.ARRAY(Integer)))
self.assert_compile(
t.insert().values(data=array([1, 2, 3])),
"INSERT INTO t (data) VALUES (ARRAY[%(param_1)s::INTEGER,"
" %(param_2)s::INTEGER, %(param_3)s::INTEGER])",
)
def test_update_array(self):
m = MetaData()
t = Table("t", m, Column("data", postgresql.ARRAY(Integer)))
self.assert_compile(
t.update().values({t.c.data: [1, 3, 4]}),
"UPDATE t SET data=%(data)s::INTEGER[]",
checkparams={"data": [1, 3, 4]},
)
def test_update_array_element(self):
m = MetaData()
t = Table("t", m, Column("data", postgresql.ARRAY(Integer)))
self.assert_compile(
t.update().values({t.c.data[5]: 1}),
"UPDATE t SET data[%(data_1)s::INTEGER]=%(param_1)s::INTEGER",
checkparams={"data_1": 5, "param_1": 1},
)
def test_update_array_slice(self):
m = MetaData()
t = Table("t", m, Column("data", postgresql.ARRAY(Integer)))
# psycopg2-specific, has a cast
self.assert_compile(
t.update().values({t.c.data[2:5]: [2, 3, 4]}),
"UPDATE t SET data[%(data_1)s:%(data_2)s]=%(param_1)s::INTEGER[]",
checkparams={"param_1": [2, 3, 4], "data_2": 5, "data_1": 2},
dialect=PGDialect_psycopg2(),
)
# default dialect does not, as DBAPIs may be doing this for us
self.assert_compile(
t.update().values({t.c.data[2:5]: [2, 3, 4]}),
"UPDATE t SET data[%s:%s]=%s",
checkparams={"param_1": [2, 3, 4], "data_2": 5, "data_1": 2},
dialect=PGDialect(paramstyle="format"),
)
def test_from_only(self):
m = MetaData()
tbl1 = Table("testtbl1", m, Column("id", Integer))
tbl2 = Table("testtbl2", m, Column("id", Integer))
stmt = tbl1.select().with_hint(tbl1, "ONLY", "postgresql")
expected = "SELECT testtbl1.id FROM ONLY testtbl1"
self.assert_compile(stmt, expected)
talias1 = tbl1.alias("foo")
stmt = talias1.select().with_hint(talias1, "ONLY", "postgresql")
expected = "SELECT foo.id FROM ONLY testtbl1 AS foo"
self.assert_compile(stmt, expected)
stmt = select(tbl1, tbl2).with_hint(tbl1, "ONLY", "postgresql")
expected = (
"SELECT testtbl1.id, testtbl2.id AS id_1 FROM ONLY testtbl1, "
"testtbl2"
)
self.assert_compile(stmt, expected)
stmt = select(tbl1, tbl2).with_hint(tbl2, "ONLY", "postgresql")
expected = (
"SELECT testtbl1.id, testtbl2.id AS id_1 FROM testtbl1, ONLY "
"testtbl2"
)
self.assert_compile(stmt, expected)
stmt = select(tbl1, tbl2)
stmt = stmt.with_hint(tbl1, "ONLY", "postgresql")
stmt = stmt.with_hint(tbl2, "ONLY", "postgresql")
expected = (
"SELECT testtbl1.id, testtbl2.id AS id_1 FROM ONLY testtbl1, "
"ONLY testtbl2"
)
self.assert_compile(stmt, expected)
stmt = update(tbl1).values(dict(id=1))
stmt = stmt.with_hint("ONLY", dialect_name="postgresql")
expected = "UPDATE ONLY testtbl1 SET id=%(id)s::INTEGER"
self.assert_compile(stmt, expected)
stmt = delete(tbl1).with_hint(
"ONLY", selectable=tbl1, dialect_name="postgresql"
)
expected = "DELETE FROM ONLY testtbl1"
self.assert_compile(stmt, expected)
tbl3 = Table("testtbl3", m, Column("id", Integer), schema="testschema")
stmt = tbl3.select().with_hint(tbl3, "ONLY", "postgresql")
expected = (
"SELECT testschema.testtbl3.id FROM ONLY testschema.testtbl3"
)
self.assert_compile(stmt, expected)
assert_raises(
exc.CompileError,
tbl3.select().with_hint(tbl3, "FAKE", "postgresql").compile,
dialect=postgresql.dialect(),
)
def test_aggregate_order_by_one(self):
m = MetaData()
table = Table("table1", m, Column("a", Integer), Column("b", Integer))
expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc()))
stmt = select(expr)
# note this tests that the object exports FROM objects
# correctly
self.assert_compile(
stmt,
"SELECT array_agg(table1.a ORDER BY table1.b DESC) "
"AS array_agg_1 FROM table1",
)
def test_aggregate_order_by_two(self):
m = MetaData()
table = Table("table1", m, Column("a", Integer), Column("b", Integer))
expr = func.string_agg(
table.c.a, aggregate_order_by(literal_column("','"), table.c.a)
)
stmt = select(expr)
self.assert_compile(
stmt,
"SELECT string_agg(table1.a, ',' ORDER BY table1.a) "
"AS string_agg_1 FROM table1",
)
def test_aggregate_order_by_multi_col(self):
m = MetaData()
table = Table("table1", m, Column("a", Integer), Column("b", Integer))
expr = func.string_agg(
table.c.a,
aggregate_order_by(
literal_column("','"), table.c.a, table.c.b.desc()
),
)
stmt = select(expr)
self.assert_compile(
stmt,
"SELECT string_agg(table1.a, "
"',' ORDER BY table1.a, table1.b DESC) "
"AS string_agg_1 FROM table1",
)
def test_aggregate_orcer_by_no_arg(self):
assert_raises_message(
TypeError,
"at least one ORDER BY element is required",
aggregate_order_by,
literal_column("','"),
)
def test_pg_array_agg_implicit_pg_array(self):
expr = pg_array_agg(column("data", Integer))
assert isinstance(expr.type, PG_ARRAY)
is_(expr.type.item_type._type_affinity, Integer)
def test_pg_array_agg_uses_base_array(self):
expr = pg_array_agg(column("data", sqltypes.ARRAY(Integer)))
assert isinstance(expr.type, sqltypes.ARRAY)
assert not isinstance(expr.type, PG_ARRAY)
is_(expr.type.item_type._type_affinity, Integer)
def test_pg_array_agg_uses_pg_array(self):
expr = pg_array_agg(column("data", PG_ARRAY(Integer)))
assert isinstance(expr.type, PG_ARRAY)
is_(expr.type.item_type._type_affinity, Integer)
def test_pg_array_agg_explicit_base_array(self):
expr = pg_array_agg(
column("data", sqltypes.ARRAY(Integer)),
type_=sqltypes.ARRAY(Integer),
)
assert isinstance(expr.type, sqltypes.ARRAY)
assert not isinstance(expr.type, PG_ARRAY)
is_(expr.type.item_type._type_affinity, Integer)
def test_pg_array_agg_explicit_pg_array(self):
expr = pg_array_agg(
column("data", sqltypes.ARRAY(Integer)), type_=PG_ARRAY(Integer)
)
assert isinstance(expr.type, PG_ARRAY)
is_(expr.type.item_type._type_affinity, Integer)
def test_aggregate_order_by_adapt(self):
m = MetaData()
table = Table("table1", m, Column("a", Integer), Column("b", Integer))
expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc()))
stmt = select(expr)
a1 = table.alias("foo")
stmt2 = sql_util.ClauseAdapter(a1).traverse(stmt)
self.assert_compile(
stmt2,
"SELECT array_agg(foo.a ORDER BY foo.b DESC) AS array_agg_1 "
"FROM table1 AS foo",
)
def test_array_agg_w_filter_subscript(self):
series = func.generate_series(1, 100).alias("series")
series_col = column("series")
query = select(
func.array_agg(series_col).filter(series_col % 2 == 0)[3]
).select_from(series)
self.assert_compile(
query,
"SELECT (array_agg(series) FILTER (WHERE series %%"
" %(series_1)s::INTEGER ="
" %(param_1)s::INTEGER))[%(param_2)s::INTEGER] AS anon_1 FROM"
" generate_series(%(generate_series_1)s::INTEGER,"
" %(generate_series_2)s::INTEGER) AS series",
)
def test_delete_extra_froms(self):
t1 = table("t1", column("c1"))
t2 = table("t2", column("c1"))
q = delete(t1).where(t1.c.c1 == t2.c.c1)
self.assert_compile(q, "DELETE FROM t1 USING t2 WHERE t1.c1 = t2.c1")
def test_delete_extra_froms_alias(self):
a1 = table("t1", column("c1")).alias("a1")
t2 = table("t2", column("c1"))
q = delete(a1).where(a1.c.c1 == t2.c.c1)
self.assert_compile(
q, "DELETE FROM t1 AS a1 USING t2 WHERE a1.c1 = t2.c1"
)
@testing.combinations(
("no_persisted", "", "ignore"),
("persisted_none", "", None),
("persisted_true", " STORED", True),
("persisted_false", " VIRTUAL", False),
id_="iaa",
)
def test_column_computed(self, text, persisted):
m = MetaData()
kwargs = {"persisted": persisted} if persisted != "ignore" else {}
t = Table(
"t",
m,
Column("x", Integer),
Column("y", Integer, Computed("x + 2", **kwargs)),
)
self.assert_compile(
schema.CreateTable(t),
"CREATE TABLE t (x INTEGER, y INTEGER GENERATED "
"ALWAYS AS (x + 2)%s)" % text,
)
def test_column_computed_persisted_false_old_version(self):
m = MetaData()
t = Table(
"t",
m,
Column("x", Integer),
Column("y", Integer, Computed("x + 2", persisted=False)),
)
old_dialect = postgresql.dialect()
old_dialect.supports_virtual_generated_columns = False
with expect_raises_message(
exc.CompileError,
"PostrgreSQL computed columns do not support 'virtual'",
):
schema.CreateTable(t).compile(dialect=old_dialect)
def test_column_computed_persisted_none_warning_old_version(self):
m = MetaData()
t = Table(
"t",
m,
Column("x", Integer),
Column("y", Integer, Computed("x + 2")),
)
old_dialect = postgresql.dialect()
old_dialect.supports_virtual_generated_columns = False
with expect_warnings(
"Computed column t.y is being created as 'STORED' since"
):
self.assert_compile(
schema.CreateTable(t),
"CREATE TABLE t (x INTEGER, y INTEGER GENERATED "
"ALWAYS AS (x + 2) STORED)",
dialect=old_dialect,
)
@testing.combinations(True, False)
def test_column_identity(self, pk):
# all other tests are in test_identity_column.py
m = MetaData()
t = Table(
"t",
m,
Column(
"y",
Integer,
Identity(always=True, start=4, increment=7),
primary_key=pk,
),
)
self.assert_compile(
schema.CreateTable(t),
"CREATE TABLE t (y INTEGER GENERATED ALWAYS AS IDENTITY "
"(INCREMENT BY 7 START WITH 4)%s)"
% (", PRIMARY KEY (y)" if pk else ""),
)
@testing.combinations(True, False)
def test_column_identity_no_support(self, pk):
m = MetaData()
t = Table(
"t",
m,
Column(
"y",
Integer,
Identity(always=True, start=4, increment=7),
primary_key=pk,
),
)
dd = PGDialect()
dd.supports_identity_columns = False
self.assert_compile(
schema.CreateTable(t),
"CREATE TABLE t (y %s%s)"
% (
"SERIAL NOT NULL" if pk else "INTEGER NOT NULL",
", PRIMARY KEY (y)" if pk else "",
),
dialect=dd,
)
def test_column_identity_null(self):
# all other tests are in test_identity_column.py
m = MetaData()
t = Table(
"t",
m,
Column(
"y",
Integer,
Identity(always=True, start=4, increment=7),
nullable=True,
),
)
self.assert_compile(
schema.CreateTable(t),
"CREATE TABLE t (y INTEGER GENERATED ALWAYS AS IDENTITY "
"(INCREMENT BY 7 START WITH 4) NULL)",
)
def test_index_extra_include_1(self):
metadata = MetaData()
tbl = Table(
"test",
metadata,
Column("x", Integer),
Column("y", Integer),
Column("z", Integer),
)
idx = Index("foo", tbl.c.x, postgresql_include=["y"])
self.assert_compile(
schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)"
)
def test_index_extra_include_2(self):
metadata = MetaData()
tbl = Table(
"test",
metadata,
Column("x", Integer),
Column("y", Integer),
Column("z", Integer),
)
idx = Index("foo", tbl.c.x, postgresql_include=[tbl.c.y])
self.assert_compile(
schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)"
)
def test_primary_key_constraint_with_include(self):
m = MetaData()
tbl = Table(
"test",
m,
Column("id", Integer),
Column("data", Integer),
PrimaryKeyConstraint("id", postgresql_include=["data"]),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE test (id SERIAL NOT NULL, data INTEGER, "
"PRIMARY KEY (id) INCLUDE (data))",
)
def test_primary_key_constraint_with_deferrable(self):
m = MetaData()
tbl = Table(
"test",
m,
Column("id", Integer),
PrimaryKeyConstraint("id", deferrable=True),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE test (id SERIAL NOT NULL, "
"PRIMARY KEY (id) DEFERRABLE)",
)
def test_primary_key_constraint_with_deferrable_and_include(self):
m = MetaData()
tbl = Table(
"test",
m,
Column("id", Integer),
Column("created_at", Integer),
PrimaryKeyConstraint(
"id",
deferrable=True,
initially="IMMEDIATE",
postgresql_include=["created_at"],
),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE test (id SERIAL NOT NULL, created_at INTEGER, "
"PRIMARY KEY (id) INCLUDE (created_at) "
"DEFERRABLE INITIALLY IMMEDIATE)",
)
def test_foreign_key_constraint_with_deferrable(self):
m = MetaData()
Table("t1", m, Column("id", Integer, primary_key=True))
t2 = Table(
"t2",
m,
Column("t1_id", Integer),
ForeignKeyConstraint(["t1_id"], ["t1.id"], deferrable=True),
)
self.assert_compile(
schema.CreateTable(t2),
"CREATE TABLE t2 (t1_id INTEGER, "
"FOREIGN KEY(t1_id) REFERENCES t1 (id) DEFERRABLE)",
)
def test_foreign_key_constraint_with_not_valid(self):
m = MetaData()
Table("t1", m, Column("id", Integer, primary_key=True))
t2 = Table(
"t2",
m,
Column("t1_id", Integer),
ForeignKeyConstraint(
["t1_id"], ["t1.id"], name="fk_t1", postgresql_not_valid=True
),
)
self.assert_compile(
schema.CreateTable(t2),
"CREATE TABLE t2 (t1_id INTEGER, "
"CONSTRAINT fk_t1 FOREIGN KEY(t1_id) REFERENCES "
"t1 (id) NOT VALID)",
)
def test_foreign_key_constraint_with_cascades_and_not_valid(self):
m = MetaData()
Table("t1", m, Column("id", Integer, primary_key=True))
t2 = Table("t2", m, Column("t1_id", Integer))
constraint = ForeignKeyConstraint(
["t1_id"],
["t1.id"],
name="fk_t1",
ondelete="CASCADE",
onupdate="SET NULL",
postgresql_not_valid=True,
)
t2.append_constraint(constraint)
self.assert_compile(
schema.AddConstraint(constraint),
"ALTER TABLE t2 ADD CONSTRAINT fk_t1 FOREIGN KEY(t1_id) "
"REFERENCES t1 (id) ON DELETE CASCADE "
"ON UPDATE SET NULL NOT VALID",
)
def test_unique_constraint_with_deferrable(self):
m = MetaData()
tbl = Table(
"test",
m,
Column("id", Integer),
UniqueConstraint("id", name="uq_id", deferrable=True),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE test (id INTEGER, "
"CONSTRAINT uq_id UNIQUE (id) DEFERRABLE)",
)
def test_unique_constraint_with_include(self):
m = MetaData()
tbl = Table(
"test",
m,
Column("id", Integer),
Column("data", Integer),
Column("created_at", Integer),
UniqueConstraint(
"id", name="uq_id", postgresql_include=["data", "created_at"]
),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE test (id INTEGER, data INTEGER, created_at INTEGER, "
"CONSTRAINT uq_id UNIQUE (id) INCLUDE (data, created_at))",
)
def test_unique_constraint_with_deferrable_and_include(self):
m = MetaData()
tbl = Table(
"test",
m,
Column("id", Integer),
Column("data", Integer),
UniqueConstraint(
"id",
name="uq_id",
deferrable=True,
initially="DEFERRED",
postgresql_include=["data"],
),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE test (id INTEGER, data INTEGER, "
"CONSTRAINT uq_id UNIQUE (id) INCLUDE (data) "
"DEFERRABLE INITIALLY DEFERRED)",
)
def test_check_constraint_with_not_valid(self):
m = MetaData()
tbl = Table(
"test",
m,
Column("data", Integer),
CheckConstraint(
"data > 0", name="ck_data", postgresql_not_valid=True
),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE test (data INTEGER, "
"CONSTRAINT ck_data CHECK (data > 0) NOT VALID)",
)
def test_check_constraint_with_deferrable_and_not_valid(self):
m = MetaData()
tbl = Table("test", m, Column("data", Integer))
constraint = CheckConstraint(
"data > 0",
name="ck_data",
deferrable=True,
initially="DEFERRED",
postgresql_not_valid=True,
)
tbl.append_constraint(constraint)
self.assert_compile(
schema.AddConstraint(constraint),
"ALTER TABLE test ADD CONSTRAINT ck_data CHECK (data > 0) "
"DEFERRABLE INITIALLY DEFERRED NOT VALID",
)
def test_check_constraint_with_deferrable(self):
m = MetaData()
tbl = Table(
"test",
m,
Column("data", Integer),
CheckConstraint("data > 0", name="ck_data", deferrable=True),
)
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE test (data INTEGER, "
"CONSTRAINT ck_data CHECK (data > 0) DEFERRABLE)",
)
@testing.fixture
def update_tables(self):
self.weather = table(
"weather",
column("temp_lo", Integer),
column("temp_hi", Integer),
column("prcp", Integer),
column("city", String),
column("date", Date),
)
self.accounts = table(
"accounts",
column("sales_id", Integer),
column("sales_person", Integer),
column("contact_first_name", String),
column("contact_last_name", String),
column("name", String),
)
self.salesmen = table(
"salesmen",
column("id", Integer),
column("first_name", String),
column("last_name", String),
)
self.employees = table(
"employees",
column("id", Integer),
column("sales_count", String),
)
# from examples at https://www.postgresql.org/docs/current/sql-update.html
def test_difficult_update_1(self, update_tables):
update = (
self.weather.update()
.where(self.weather.c.city == "San Francisco")
.where(self.weather.c.date == "2003-07-03")
.values(
{
tuple_(
self.weather.c.temp_lo,
self.weather.c.temp_hi,
self.weather.c.prcp,
): tuple_(
self.weather.c.temp_lo + 1,
self.weather.c.temp_lo + 15,
literal_column("DEFAULT"),
)
}
)
)
self.assert_compile(
update,
"UPDATE weather SET (temp_lo, temp_hi, prcp)=(weather.temp_lo +"
" %(temp_lo_1)s::INTEGER, weather.temp_lo +"
" %(temp_lo_2)s::INTEGER, DEFAULT) WHERE weather.city ="
" %(city_1)s::VARCHAR AND weather.date = %(date_1)s::VARCHAR",
{
"city_1": "San Francisco",
"date_1": "2003-07-03",
"temp_lo_1": 1,
"temp_lo_2": 15,
},
)
def test_difficult_update_2(self, update_tables):
update = self.accounts.update().values(
{
tuple_(
self.accounts.c.contact_first_name,
self.accounts.c.contact_last_name,
): (
select(
self.salesmen.c.first_name, self.salesmen.c.last_name
)
.where(self.salesmen.c.id == self.accounts.c.sales_id)
.scalar_subquery()
)
}
)
self.assert_compile(
update,
"UPDATE accounts SET (contact_first_name, contact_last_name)="
"(SELECT salesmen.first_name, salesmen.last_name FROM "
"salesmen WHERE salesmen.id = accounts.sales_id)",
)
def test_difficult_update_3(self, update_tables):
update = (
self.employees.update()
.values(
{
self.employees.c.sales_count: (
self.employees.c.sales_count + 1
)
}
)
.where(
self.employees.c.id
== select(self.accounts.c.sales_person)
.where(self.accounts.c.name == "Acme Corporation")
.scalar_subquery()
)
)
self.assert_compile(
update,
"UPDATE employees SET sales_count=(employees.sales_count +"
" %(sales_count_1)s::INTEGER) WHERE employees.id = (SELECT"
" accounts.sales_person FROM accounts WHERE accounts.name ="
" %(name_1)s::VARCHAR)",
{"sales_count_1": 1, "name_1": "Acme Corporation"},
)
def test_difficult_update_4(self):
summary = table(
"summary",
column("group_id", Integer),
column("sum_y", Float),
column("sum_x", Float),
column("avg_x", Float),
column("avg_y", Float),
)
data = table(
"data",
column("group_id", Integer),
column("x", Float),
column("y", Float),
)
update = summary.update().values(
{
tuple_(
summary.c.sum_x,
summary.c.sum_y,
summary.c.avg_x,
summary.c.avg_y,
): (
select(
func.sum(data.c.x),
func.sum(data.c.y),
func.avg(data.c.x),
func.avg(data.c.y),
)
.where(data.c.group_id == summary.c.group_id)
.scalar_subquery()
)
}
)
self.assert_compile(
update,
"UPDATE summary SET (sum_x, sum_y, avg_x, avg_y)="
"(SELECT sum(data.x) AS sum_1, sum(data.y) AS sum_2, "
"avg(data.x) AS avg_1, avg(data.y) AS avg_2 FROM data "
"WHERE data.group_id = summary.group_id)",
)
@testing.combinations(JSONB.JSONPathType, JSONPATH)
def test_json_path(self, type_):
data = table("data", column("id", Integer), column("x", JSONB))
stmt = select(
func.jsonb_path_exists(data.c.x, cast("$.data.w", type_))
)
self.assert_compile(
stmt,
"SELECT jsonb_path_exists(data.x, CAST(%(param_1)s AS JSONPATH)) "
"AS jsonb_path_exists_1 FROM data",
)
@testing.combinations(
(
lambda col: col["foo"] + " ",
"x[%(x_1)s::TEXT] || %(param_1)s::VARCHAR",
),
(
lambda col: col["foo"] + " " + col["bar"],
"x[%(x_1)s::TEXT] || %(param_1)s::VARCHAR || x[%(x_2)s::TEXT]",
),
argnames="expr, expected",
)
def test_eager_grouping_flag(self, expr, expected):
"""test #10479"""
col = Column("x", JSONB)
expr = testing.resolve_lambda(expr, col=col)
# Choose expected result based on type
self.assert_compile(expr, expected)
@testing.variation("pgversion", ["pg14", "pg13"])
def test_jsonb_subscripting(self, pgversion):
"""test #10927 - PostgreSQL 14+ JSONB subscripting syntax"""
data = table("data", column("id", Integer), column("x", JSONB))
dialect = postgresql.dialect()
if pgversion.pg13:
dialect._supports_jsonb_subscripting = False
# Test SELECT with JSONB indexing
stmt = select(data.c.x["key"])
self.assert_compile(
stmt,
(
"SELECT data.x[%(x_1)s::TEXT] AS anon_1 FROM data"
if pgversion.pg14
else "SELECT data.x -> %(x_1)s::TEXT AS anon_1 FROM data"
),
dialect=dialect,
)
# Test UPDATE with JSONB indexing (the original issue case)
stmt = update(data).values({data.c.x["new_key"]: data.c.x["old_key"]})
self.assert_compile(
stmt,
(
"UPDATE data SET x[%(x_1)s::TEXT]=(data.x[%(x_2)s::TEXT])"
if pgversion.pg14
else (
"UPDATE data SET x -> %(x_1)s::TEXT=(data.x ->"
" %(x_2)s::TEXT)"
)
),
dialect=dialect,
)
def test_json_still_uses_arrow_syntax(self):
"""test #10927 - JSON type still uses arrow syntax even on PG 14+"""
data = table("data", column("id", Integer), column("x", JSON))
# Test PostgreSQL 14+ still uses arrow syntax for JSON (not JSONB)
# Test SELECT with JSON indexing
stmt = select(data.c.x["key"])
self.assert_compile(
stmt, "SELECT data.x -> %(x_1)s::TEXT AS anon_1 FROM data"
)
# Test UPDATE with JSON indexing
stmt = update(data).values({data.c.x["new_key"]: data.c.x["old_key"]})
self.assert_compile(
stmt,
"UPDATE data SET x -> %(x_1)s::TEXT=(data.x -> %(x_2)s::TEXT)",
)
@testing.variation("pgversion", ["pg14", "pg13"])
def test_hstore_subscripting(self, pgversion):
"""test #12948 - PostgreSQL 14+ HSTORE subscripting syntax"""
data = table("data", column("id", Integer), column("h", HSTORE))
dialect = postgresql.dialect()
if pgversion.pg13:
dialect._supports_jsonb_subscripting = False
# Test SELECT with HSTORE indexing
stmt = select(data.c.h["key"])
self.assert_compile(
stmt,
(
"SELECT data.h[%(h_1)s::VARCHAR] AS anon_1 FROM data"
if pgversion.pg14
else "SELECT data.h -> %(h_1)s::VARCHAR AS anon_1 FROM data"
),
dialect=dialect,
)
# Test UPDATE with HSTORE indexing (the original issue case)
stmt = update(data).values({data.c.h["new_key"]: data.c.h["old_key"]})
self.assert_compile(
stmt,
(
"UPDATE data SET"
" h[%(h_1)s::VARCHAR]=(data.h[%(h_2)s::VARCHAR])"
if pgversion.pg14
else (
"UPDATE data SET h -> %(h_1)s::VARCHAR=(data.h ->"
" %(h_2)s::VARCHAR)"
)
),
dialect=dialect,
)
def test_jsonb_functions_use_parentheses_with_subscripting(self):
"""test #12778 - JSONB functions are parenthesized with [] syntax"""
data = table("data", column("id", Integer), column("x", JSONB))
# Test that JSONB functions are properly parenthesized with [] syntax
# This ensures correct PostgreSQL syntax: (function_call)[index]
# instead of the invalid: function_call[index]
stmt = select(func.jsonb_array_elements(data.c.x, type_=JSONB)["key"])
self.assert_compile(
stmt,
"SELECT"
" (jsonb_array_elements(data.x))[%(jsonb_array_elements_1)s::TEXT]"
" AS anon_1 FROM data",
)
# Test with nested function calls
stmt = select(
func.jsonb_array_elements(data.c.x["items"], type_=JSONB)["key"]
)
self.assert_compile(
stmt,
"SELECT"
" (jsonb_array_elements(data.x[%(x_1)s::TEXT]))"
"[%(jsonb_array_elements_1)s::TEXT]"
" AS anon_1 FROM data",
)
def test_jsonb_cast_use_parentheses_with_subscripting(self):
"""test #13067 - JSONB cast expressions parenthesized with [] syntax"""
# Test that JSONB cast expressions are properly parenthesized with []
# syntax. This ensures correct PostgreSQL syntax: (CAST(...))[index]
# instead of the invalid: CAST(...)[index]
stmt = select(cast({"foo": "bar"}, JSONB)["foo"])
self.assert_compile(
stmt,
"SELECT (CAST(%(param_1)s::JSONB AS JSONB))[%(param_2)s::TEXT] "
"AS anon_1",
)
# Test with nested cast within subscripts
data = table("data", column("id", Integer), column("x", JSONB))
stmt = select(data.c.x[cast("key", String)])
self.assert_compile(
stmt,
"SELECT data.x[CAST(%(param_1)s::VARCHAR AS VARCHAR)] AS anon_1 "
"FROM data",
)
def test_range_custom_object_hook(self):
# See issue #8884
from datetime import date
usages = table(
"usages",
column("id", Integer),
column("date", Date),
column("amount", Integer),
)
period = Range(date(2022, 1, 1), (2023, 1, 1))
stmt = select(func.sum(usages.c.amount)).where(
usages.c.date.op("<@")(period)
)
self.assert_compile(
stmt,
"SELECT sum(usages.amount) AS sum_1 FROM usages "
"WHERE usages.date <@ %(date_1)s::DATERANGE",
)
def test_multirange_custom_object_hook(self):
from datetime import date
usages = table(
"usages",
column("id", Integer),
column("date", Date),
column("amount", Integer),
)
period = MultiRange(
[
Range(date(2022, 1, 1), (2023, 1, 1)),
Range(date(2024, 1, 1), (2025, 1, 1)),
]
)
stmt = select(func.sum(usages.c.amount)).where(
usages.c.date.op("<@")(period)
)
self.assert_compile(
stmt,
"SELECT sum(usages.amount) AS sum_1 FROM usages "
"WHERE usages.date <@ %(date_1)s::DATEMULTIRANGE",
)
def test_bitwise_xor(self):
c1 = column("c1", Integer)
c2 = column("c2", Integer)
self.assert_compile(
select(c1.bitwise_xor(c2)),
"SELECT c1 # c2 AS anon_1",
)
def test_ilike_escaping(self):
dialect = postgresql.dialect()
self.assert_compile(
sql.column("foo").ilike("bar", escape="\\"),
"foo ILIKE %(foo_1)s::VARCHAR ESCAPE '\\\\'",
)
self.assert_compile(
sql.column("foo").ilike("bar", escape=""),
"foo ILIKE %(foo_1)s::VARCHAR ESCAPE ''",
dialect=dialect,
)
self.assert_compile(
sql.column("foo").notilike("bar", escape="\\"),
"foo NOT ILIKE %(foo_1)s::VARCHAR ESCAPE '\\\\'",
)
self.assert_compile(
sql.column("foo").notilike("bar", escape=""),
"foo NOT ILIKE %(foo_1)s::VARCHAR ESCAPE ''",
dialect=dialect,
)
@testing.combinations(
(lambda t: t.c.a**t.c.b, "power(t.a, t.b)", {}),
(lambda t: t.c.a**3, "power(t.a, %(pow_1)s::INTEGER)", {"pow_1": 3}),
(
lambda t: func.pow(t.c.a, 3),
"power(t.a, %(pow_1)s::INTEGER)",
{"pow_1": 3},
),
(lambda t: func.power(t.c.a, t.c.b), "power(t.a, t.b)", {}),
)
def test_simple_compile(self, fn, string, params):
t = table("t", column("a", Integer), column("b", Integer))
expr = resolve_lambda(fn, t=t)
self.assert_compile(expr, string, params)
class InsertOnConflictTest(
fixtures.TablesTest, AssertsCompiledSQL, fixtures.CacheKeySuite
):
__dialect__ = postgresql.dialect()
run_create_tables = None
@classmethod
def define_tables(cls, metadata):
cls.table1 = table1 = table(
"mytable",
column("myid", Integer),
column("name", String(128)),
column("description", String(128)),
)
cls.table_with_metadata = Table(
"mytable",
metadata,
Column("myid", Integer, primary_key=True),
Column("name", String(128)),
Column("description", String(128)),
)
cls.unique_constr = schema.UniqueConstraint(
table1.c.name, name="uq_name"
)
cls.excl_constr = ExcludeConstraint(
(table1.c.name, "="),
(table1.c.description, "&&"),
name="excl_thing",
)
cls.excl_constr_anon = ExcludeConstraint(
(cls.table_with_metadata.c.name, "="),
(cls.table_with_metadata.c.description, "&&"),
where=cls.table_with_metadata.c.description != "foo",
)
cls.excl_constr_anon_str = ExcludeConstraint(
(cls.table_with_metadata.c.name, "="),
(cls.table_with_metadata.c.description, "&&"),
where="description != 'foo'",
)
cls.goofy_index = Index(
"goofy_index", table1.c.name, postgresql_where=table1.c.name > "m"
)
Table(
"users",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50)),
)
Table(
"users_w_key",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50), key="name_keyed"),
)
@testing.combinations(
(
lambda users, stmt: stmt.on_conflict_do_nothing(
index_elements=["id"], index_where=text("name = 'hi'")
),
"ON CONFLICT (id) WHERE name = 'hi' DO NOTHING",
),
(
lambda users, stmt: stmt.on_conflict_do_nothing(
index_elements=[users.c.id], index_where=users.c.name == "hi"
),
"ON CONFLICT (id) WHERE name = %(name_1)s::VARCHAR DO NOTHING",
),
(
lambda users, stmt: stmt.on_conflict_do_nothing(
index_elements=["id"], index_where="name = 'hi'"
),
exc.ArgumentError,
),
(
lambda users, stmt: stmt.on_conflict_do_update(
index_elements=[users.c.id],
set_={users.c.name: "there"},
where=users.c.name == "hi",
),
(
"ON CONFLICT (id) DO UPDATE SET name = %(param_1)s::VARCHAR "
"WHERE users.name = %(name_1)s::VARCHAR"
),
),
(
lambda users, stmt: stmt.on_conflict_do_update(
index_elements=[users.c.id],
set_={users.c.name: "there"},
where=text("name = 'hi'"),
),
(
"ON CONFLICT (id) DO UPDATE SET name = %(param_1)s::VARCHAR "
"WHERE name = 'hi'"
),
),
(
lambda users, stmt: stmt.on_conflict_do_update(
index_elements=[users.c.id],
set_={users.c.name: "there"},
where="name = 'hi'",
),
exc.ArgumentError,
),
)
def test_assorted_arg_coercion(self, case, expected):
stmt = insert(self.tables.users)
if isinstance(expected, type) and issubclass(expected, Exception):
with expect_raises(expected):
testing.resolve_lambda(
case, stmt=stmt, users=self.tables.users
),
else:
self.assert_compile(
testing.resolve_lambda(
case, stmt=stmt, users=self.tables.users
),
"INSERT INTO users (id, name) VALUES (%(id)s::INTEGER,"
f" %(name)s::VARCHAR) {expected}",
)
@fixtures.CacheKeySuite.run_suite_tests
def test_insert_on_conflict_cache_key(self):
table = Table(
"foos",
MetaData(),
Column("id", Integer, primary_key=True),
Column("bar", String(10)),
Column("baz", String(10)),
)
Index("foo_idx", table.c.id)
def stmt0():
# note a multivalues INSERT is not cacheable; use just one
# set of values
return insert(table).values(
{"id": 1, "bar": "ab"},
)
def stmt1():
stmt = stmt0()
return stmt.on_conflict_do_nothing()
def stmt2():
stmt = stmt0()
return stmt.on_conflict_do_nothing(index_elements=["id"])
def stmt21():
stmt = stmt0()
return stmt.on_conflict_do_nothing(index_elements=[table.c.id])
def stmt22():
stmt = stmt0()
return stmt.on_conflict_do_nothing(
index_elements=["id", table.c.bar]
)
def stmt23():
stmt = stmt0()
return stmt.on_conflict_do_nothing(index_elements=["id", "bar"])
def stmt24():
stmt = insert(table).values(
{"id": 1, "bar": "ab", "baz": "xy"},
)
return stmt.on_conflict_do_nothing(index_elements=["id", "bar"])
def stmt3():
stmt = stmt0()
return stmt.on_conflict_do_update(
index_elements=["id"],
set_={
"bar": random.choice(["a", "b", "c"]),
"baz": random.choice(["d", "e", "f"]),
},
)
def stmt31():
stmt = stmt0()
return stmt.on_conflict_do_update(
index_elements=["id"],
set_={
"baz": random.choice(["d", "e", "f"]),
},
)
def stmt4():
stmt = stmt0()
return stmt.on_conflict_do_update(
constraint=table.primary_key, set_=stmt.excluded
)
def stmt41():
stmt = stmt0()
return stmt.on_conflict_do_update(
constraint=table.primary_key,
set_=stmt.excluded,
where=table.c.bar != random.choice(["q", "p", "r", "z"]),
)
def stmt42():
stmt = stmt0()
return stmt.on_conflict_do_update(
constraint=table.primary_key,
set_=stmt.excluded,
where=table.c.baz != random.choice(["q", "p", "r", "z"]),
)
return lambda: [
stmt0(),
stmt1(),
stmt2(),
stmt21(),
stmt22(),
stmt23(),
stmt24(),
stmt3(),
stmt31(),
stmt4(),
stmt41(),
stmt42(),
]
@testing.combinations("control", "excluded", "dict")
def test_set_excluded(self, scenario):
"""test #8014, sending all of .excluded to set"""
if scenario == "control":
users = self.tables.users
stmt = insert(users)
self.assert_compile(
stmt.on_conflict_do_update(
constraint=users.primary_key, set_=stmt.excluded
),
"INSERT INTO users (id, name) VALUES (%(id)s::INTEGER,"
" %(name)s::VARCHAR) ON CONFLICT (id) DO UPDATE SET id ="
" excluded.id, name = excluded.name",
)
else:
users_w_key = self.tables.users_w_key
stmt = insert(users_w_key)
if scenario == "excluded":
self.assert_compile(
stmt.on_conflict_do_update(
constraint=users_w_key.primary_key, set_=stmt.excluded
),
"INSERT INTO users_w_key (id, name) VALUES"
" (%(id)s::INTEGER, %(name_keyed)s::VARCHAR) ON CONFLICT"
" (id) DO UPDATE SET id = excluded.id, name ="
" excluded.name",
)
else:
self.assert_compile(
stmt.on_conflict_do_update(
constraint=users_w_key.primary_key,
set_={
"id": stmt.excluded.id,
"name_keyed": stmt.excluded.name_keyed,
},
),
"INSERT INTO users_w_key (id, name) VALUES"
" (%(id)s::INTEGER, %(name_keyed)s::VARCHAR) ON CONFLICT"
" (id) DO UPDATE SET id = excluded.id, name ="
" excluded.name",
)
def test_dont_consume_set_collection(self):
users = self.tables.users
stmt = insert(users).values(
[
{
"name": "spongebob",
},
{
"name": "sandy",
},
]
)
stmt = stmt.on_conflict_do_update(
index_elements=[users.c.name], set_=dict(name=stmt.excluded.name)
)
self.assert_compile(
stmt,
"INSERT INTO users (name) VALUES (%(name_m0)s::VARCHAR),"
" (%(name_m1)s::VARCHAR) ON CONFLICT (name) DO UPDATE SET name ="
" excluded.name",
)
stmt = stmt.returning(users)
self.assert_compile(
stmt,
"INSERT INTO users (name) VALUES (%(name_m0)s::VARCHAR),"
" (%(name_m1)s::VARCHAR) ON CONFLICT (name) DO UPDATE SET name ="
" excluded.name RETURNING users.id, users.name",
)
def test_on_conflict_do_no_call_twice(self):
users = self.table1
for stmt in (
insert(users).on_conflict_do_nothing(),
insert(users).on_conflict_do_update(
index_elements=[users.c.myid], set_=dict(name="foo")
),
):
for meth in (
stmt.on_conflict_do_nothing,
stmt.on_conflict_do_update,
):
with testing.expect_raises_message(
exc.InvalidRequestError,
"This Insert construct already has an "
"ON CONFLICT clause established",
):
meth()
def test_on_conflict_cte_plus_textual(self):
"""test #7798"""
bar = table("bar", column("id"), column("attr"), column("foo_id"))
s1 = text("SELECT bar.id, bar.attr FROM bar").columns(
bar.c.id, bar.c.attr
)
s2 = (
insert(bar)
.from_select(list(s1.selected_columns), s1)
.on_conflict_do_update(
index_elements=[s1.selected_columns.id],
set_={"attr": s1.selected_columns.attr},
)
)
self.assert_compile(
s2,
"INSERT INTO bar (id, attr) SELECT bar.id, bar.attr "
"FROM bar ON CONFLICT (id) DO UPDATE SET attr = bar.attr",
)
def test_do_nothing_no_target(self):
i = (
insert(self.table1)
.values(dict(name="foo"))
.on_conflict_do_nothing()
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" DO NOTHING",
)
def test_do_nothing_index_elements_target(self):
i = (
insert(self.table1)
.values(dict(name="foo"))
.on_conflict_do_nothing(index_elements=["myid"])
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (myid) DO NOTHING",
)
def test_do_update_set_clause_none(self):
i = insert(self.table_with_metadata).values(myid=1, name="foo")
i = i.on_conflict_do_update(
index_elements=["myid"],
set_=OrderedDict([("name", "I'm a name"), ("description", None)]),
)
self.assert_compile(
i,
"INSERT INTO mytable (myid, name) VALUES (%(myid)s::INTEGER,"
" %(name)s::VARCHAR) ON CONFLICT (myid) DO UPDATE SET name ="
" %(param_1)s::VARCHAR, description = %(param_2)s::VARCHAR",
{
"myid": 1,
"name": "foo",
"param_1": "I'm a name",
"param_2": None,
},
)
def test_do_update_set_clause_column_keys(self):
i = insert(self.table_with_metadata).values(myid=1, name="foo")
i = i.on_conflict_do_update(
index_elements=["myid"],
set_=OrderedDict(
[
(self.table_with_metadata.c.name, "I'm a name"),
(self.table_with_metadata.c.description, None),
]
),
)
self.assert_compile(
i,
"INSERT INTO mytable (myid, name) VALUES (%(myid)s::INTEGER,"
" %(name)s::VARCHAR) ON CONFLICT (myid) DO UPDATE SET name ="
" %(param_1)s::VARCHAR, description = %(param_2)s::VARCHAR",
{
"myid": 1,
"name": "foo",
"param_1": "I'm a name",
"param_2": None,
},
)
def test_do_update_set_clause_literal(self):
i = insert(self.table_with_metadata).values(myid=1, name="foo")
i = i.on_conflict_do_update(
index_elements=["myid"],
set_=OrderedDict(
[("name", "I'm a name"), ("description", null())]
),
)
self.assert_compile(
i,
"INSERT INTO mytable (myid, name) VALUES (%(myid)s::INTEGER,"
" %(name)s::VARCHAR) ON CONFLICT (myid) DO UPDATE SET name ="
" %(param_1)s::VARCHAR, description = NULL",
{"myid": 1, "name": "foo", "param_1": "I'm a name"},
)
def test_do_update_str_index_elements_target_one(self):
i = insert(self.table_with_metadata).values(myid=1, name="foo")
i = i.on_conflict_do_update(
index_elements=["myid"],
set_=OrderedDict(
[
("name", i.excluded.name),
("description", i.excluded.description),
]
),
)
self.assert_compile(
i,
"INSERT INTO mytable (myid, name) VALUES (%(myid)s::INTEGER,"
" %(name)s::VARCHAR) ON CONFLICT (myid) DO UPDATE SET name ="
" excluded.name, description = excluded.description",
)
def test_do_update_str_index_elements_target_two(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
index_elements=["myid"], set_=dict(name=i.excluded.name)
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (myid) DO UPDATE SET name = excluded.name",
)
def test_do_update_col_index_elements_target(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
index_elements=[self.table1.c.myid],
set_=dict(name=i.excluded.name),
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (myid) DO UPDATE SET name = excluded.name",
)
def test_do_update_unnamed_pk_constraint_target(self):
i = insert(self.table_with_metadata).values(dict(myid=1, name="foo"))
i = i.on_conflict_do_update(
constraint=self.table_with_metadata.primary_key,
set_=dict(name=i.excluded.name),
)
self.assert_compile(
i,
"INSERT INTO mytable (myid, name) VALUES (%(myid)s::INTEGER,"
" %(name)s::VARCHAR) ON CONFLICT (myid) DO UPDATE SET name ="
" excluded.name",
)
def test_do_update_pk_constraint_index_elements_target(self):
i = insert(self.table_with_metadata).values(dict(myid=1, name="foo"))
i = i.on_conflict_do_update(
index_elements=self.table_with_metadata.primary_key,
set_=dict(name=i.excluded.name),
)
self.assert_compile(
i,
"INSERT INTO mytable (myid, name) VALUES (%(myid)s::INTEGER,"
" %(name)s::VARCHAR) ON CONFLICT (myid) DO UPDATE SET name ="
" excluded.name",
)
def test_do_update_named_unique_constraint_target(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
constraint=self.unique_constr, set_=dict(myid=i.excluded.myid)
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" ON CONSTRAINT uq_name DO UPDATE SET myid = excluded.myid",
)
def test_do_update_string_constraint_target(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
constraint=self.unique_constr.name, set_=dict(myid=i.excluded.myid)
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" ON CONSTRAINT uq_name DO UPDATE SET myid = excluded.myid",
)
def test_do_nothing_quoted_string_constraint_target(self):
"""test #6696"""
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_nothing(constraint="Some Constraint Name")
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
' ON CONSTRAINT "Some Constraint Name" DO NOTHING',
)
def test_do_nothing_super_long_name_constraint_target(self):
"""test #6755"""
m = MetaData(
naming_convention={"uq": "%(table_name)s_%(column_0_N_name)s_key"}
)
uq = UniqueConstraint("some_column_name_thats_really_really_long_too")
Table(
"some_table_name_thats_really_really",
m,
Column("some_column_name_thats_really_really_long_too", Integer),
uq,
)
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_nothing(constraint=uq)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" ON CONSTRAINT"
" some_table_name_thats_really_really_some_column_name_th_f7ab DO"
" NOTHING",
)
def test_do_nothing_quoted_named_constraint_target(self):
"""test #6696"""
i = insert(self.table1).values(dict(name="foo"))
unique_constr = UniqueConstraint(
self.table1.c.myid, name="Some Constraint Name"
)
i = i.on_conflict_do_nothing(constraint=unique_constr)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
' ON CONSTRAINT "Some Constraint Name" DO NOTHING',
)
def test_do_update_index_elements_where_target(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
index_elements=self.goofy_index.expressions,
index_where=self.goofy_index.dialect_options["postgresql"][
"where"
],
set_=dict(name=i.excluded.name),
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (name) WHERE name > %(name_1)s::VARCHAR DO UPDATE SET name ="
" excluded.name",
)
def test_do_update_index_elements_where_target_multivalues(self):
i = insert(self.table1).values(
[dict(name="foo"), dict(name="bar"), dict(name="bat")],
)
i = i.on_conflict_do_update(
index_elements=self.goofy_index.expressions,
index_where=self.goofy_index.dialect_options["postgresql"][
"where"
],
set_=dict(name=i.excluded.name),
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name_m0)s::VARCHAR),"
" (%(name_m1)s::VARCHAR), (%(name_m2)s::VARCHAR) ON CONFLICT"
" (name) WHERE name > %(name_1)s::VARCHAR DO UPDATE SET name ="
" excluded.name",
checkparams={
"name_1": "m",
"name_m0": "foo",
"name_m1": "bar",
"name_m2": "bat",
},
)
def test_do_update_unnamed_index_target(self):
i = insert(self.table1).values(dict(name="foo"))
unnamed_goofy = Index(
None, self.table1.c.name, postgresql_where=self.table1.c.name > "m"
)
i = i.on_conflict_do_update(
constraint=unnamed_goofy, set_=dict(name=i.excluded.name)
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (name) WHERE name > %(name_1)s::VARCHAR DO UPDATE SET name ="
" excluded.name",
)
def test_do_update_unnamed_exclude_constraint_target(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
constraint=self.excl_constr_anon, set_=dict(name=i.excluded.name)
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (name, description) WHERE description !="
" %(description_1)s::VARCHAR DO UPDATE SET name = excluded.name",
)
def test_do_update_unnamed_exclude_constraint_string_target(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
constraint=self.excl_constr_anon_str,
set_=dict(name=i.excluded.name),
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (name, description) WHERE description != 'foo' DO UPDATE SET"
" name = excluded.name",
)
def test_do_update_add_whereclause(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
constraint=self.excl_constr_anon,
set_=dict(name=i.excluded.name),
where=(
(self.table1.c.name != "brah")
& (self.table1.c.description != "brah")
),
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (name, description) WHERE description !="
" %(description_1)s::VARCHAR DO UPDATE SET name = excluded.name"
" WHERE mytable.name != %(name_1)s::VARCHAR AND"
" mytable.description != %(description_2)s::VARCHAR",
)
def test_do_update_str_index_where(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
constraint=self.excl_constr_anon_str,
set_=dict(name=i.excluded.name),
where=(
(self.table1.c.name != "brah")
& (self.table1.c.description != "brah")
),
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (name, description) WHERE description != 'foo' DO UPDATE SET"
" name = excluded.name WHERE mytable.name != %(name_1)s::VARCHAR"
" AND mytable.description != %(description_1)s::VARCHAR",
)
def test_do_update_add_whereclause_references_excluded(self):
i = insert(self.table1).values(dict(name="foo"))
i = i.on_conflict_do_update(
constraint=self.excl_constr_anon,
set_=dict(name=i.excluded.name),
where=(self.table1.c.name != i.excluded.name),
)
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON CONFLICT"
" (name, description) WHERE description !="
" %(description_1)s::VARCHAR DO UPDATE SET name = excluded.name"
" WHERE mytable.name != excluded.name",
)
def test_do_update_additional_colnames(self):
i = insert(self.table1).values(dict(name="bar"))
i = i.on_conflict_do_update(
constraint=self.excl_constr_anon,
set_=dict(name="somename", unknown="unknown"),
)
with expect_warnings(
"Additional column names not matching any "
"column keys in table 'mytable': 'unknown'"
):
self.assert_compile(
i,
"INSERT INTO mytable (name) VALUES (%(name)s::VARCHAR) ON"
" CONFLICT (name, description) WHERE description !="
" %(description_1)s::VARCHAR DO UPDATE SET name ="
" %(param_1)s::VARCHAR, unknown = %(param_2)s",
checkparams={
"name": "bar",
"description_1": "foo",
"param_1": "somename",
"param_2": "unknown",
},
)
def test_on_conflict_as_cte(self):
i = insert(self.table1).values(dict(name="foo"))
i = (
i.on_conflict_do_update(
constraint=self.excl_constr_anon,
set_=dict(name=i.excluded.name),
where=(self.table1.c.name != i.excluded.name),
)
.returning(literal_column("1"))
.cte("i_upsert")
)
stmt = select(i)
self.assert_compile(
stmt,
"WITH i_upsert AS (INSERT INTO mytable (name) VALUES"
" (%(param_1)s::VARCHAR) ON CONFLICT (name, description) WHERE"
" description != %(description_1)s::VARCHAR DO UPDATE SET name ="
" excluded.name WHERE mytable.name != excluded.name RETURNING 1)"
" SELECT i_upsert.1 FROM i_upsert",
)
def test_combined_with_cte(self):
t = table("t", column("c1"), column("c2"))
delete_statement_cte = t.delete().where(t.c.c1 < 1).cte("deletions")
insert_stmt = insert(t).values([{"c1": 1, "c2": 2}])
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[t.c.c1],
set_={
col.name: col
for col in insert_stmt.excluded
if col.name in ("c1", "c2")
},
).add_cte(delete_statement_cte)
self.assert_compile(
update_stmt,
"WITH deletions AS (DELETE FROM t WHERE t.c1 < %(c1_1)s::INTEGER)"
" INSERT INTO t (c1, c2) VALUES (%(c1_m0)s, %(c2_m0)s) ON CONFLICT"
" (c1) DO UPDATE SET c1 = excluded.c1, c2 = excluded.c2",
checkparams={"c1_m0": 1, "c2_m0": 2, "c1_1": 1},
)
def test_quote_raw_string_col(self):
t = table("t", column("FancyName"), column("other name"))
stmt = (
insert(t)
.values(FancyName="something new")
.on_conflict_do_update(
index_elements=["FancyName", "other name"],
set_=OrderedDict(
[
("FancyName", "something updated"),
("other name", "something else"),
]
),
)
)
self.assert_compile(
stmt,
'INSERT INTO t ("FancyName") VALUES (%(FancyName)s) '
'ON CONFLICT ("FancyName", "other name") '
'DO UPDATE SET "FancyName" = %(param_1)s, '
'"other name" = %(param_2)s',
{
"param_1": "something updated",
"param_2": "something else",
"FancyName": "something new",
},
)
@testing.variation(
"path", ["unknown_columns", "whereclause", "indexwhere"]
)
def test_on_conflict_literal_binds(self, path: testing.Variation):
"""test for #13110"""
i = insert(self.table_with_metadata).values(myid=1, name="foo")
if path.unknown_columns:
i = i.on_conflict_do_update(
index_elements=["myid"],
set_=OrderedDict(
[
("name", "I'm a name"),
("other_param", literal("this too")),
]
),
)
expected = (
"ON CONFLICT (myid) DO UPDATE SET name = "
"'I''m a name', other_param = 'this too'"
)
warnings = expect_warnings(
"Additional column names not matching any column keys"
)
elif path.whereclause:
i = i.on_conflict_do_update(
index_elements=["myid"],
set_={"name": "I'm a name"},
where=self.table_with_metadata.c.name == "foo",
)
expected = (
"ON CONFLICT (myid) DO UPDATE SET name = "
"'I''m a name' WHERE mytable.name = 'foo'"
)
warnings = contextlib.nullcontext()
elif path.indexwhere:
i = i.on_conflict_do_update(
index_elements=["myid"],
set_={"name": "I'm a name"},
index_where=self.goofy_index.dialect_options["postgresql"][
"where"
],
)
warnings = contextlib.nullcontext()
expected = (
"ON CONFLICT (myid) WHERE name > 'm' "
"DO UPDATE SET name = 'I''m a name'"
)
else:
path.fail()
with warnings:
self.assert_compile(
i,
"INSERT INTO mytable (myid, name) VALUES (1, 'foo')"
f" {expected}",
{},
literal_binds=True,
)
class DistinctOnTest(
fixtures.MappedTest,
AssertsCompiledSQL,
fixtures.CacheKeySuite,
fixtures.DistinctOnFixture,
):
"""Test 'DISTINCT' with SQL expression language and orm.Query with
an emphasis on PG's 'DISTINCT ON' syntax.
"""
__dialect__ = postgresql.dialect()
def setup_test(self):
self.table = Table(
"t",
MetaData(),
Column("id", Integer, primary_key=True),
Column("a", String),
Column("b", String),
)
def test_distinct_on_no_cols(self, distinct_on_fixture):
self.assert_compile(
distinct_on_fixture(select(self.table)),
"SELECT DISTINCT t.id, t.a, t.b FROM t",
)
def test_distinct_on_cols(self, distinct_on_fixture):
self.assert_compile(
distinct_on_fixture(select(self.table), self.table.c.a),
"SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t",
)
self.assert_compile(
distinct_on_fixture(
self.table.select(), self.table.c.a, self.table.c.b
),
"SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t",
checkparams={},
)
def test_distinct_on_columns_generative_multi_call(
self, distinct_on_fixture
):
stmt = select(self.table)
stmt = distinct_on_fixture(stmt, self.table.c.a)
stmt = distinct_on_fixture(stmt, self.table.c.b)
self.assert_compile(
stmt,
"SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t",
)
def test_distinct_on_dupe_columns_generative_multi_call(
self, distinct_on_fixture
):
stmt = select(self.table)
stmt = distinct_on_fixture(stmt, self.table.c.a)
stmt = distinct_on_fixture(stmt, self.table.c.a)
self.assert_compile(
stmt,
"SELECT DISTINCT ON (t.a, t.a) t.id, t.a, t.b FROM t",
)
def test_legacy_query_plain(self, distinct_on_fixture):
sess = Session()
self.assert_compile(
distinct_on_fixture(sess.query(self.table)),
"SELECT DISTINCT t.id AS t_id, t.a AS t_a, t.b AS t_b FROM t",
)
def test_legacy_query_on_columns(self, distinct_on_fixture):
sess = Session()
self.assert_compile(
distinct_on_fixture(sess.query(self.table), self.table.c.a),
"SELECT DISTINCT ON (t.a) t.id AS t_id, t.a AS t_a, "
"t.b AS t_b FROM t",
)
def test_legacy_query_distinct_on_columns_multi_call(
self, distinct_on_fixture
):
sess = Session()
self.assert_compile(
distinct_on_fixture(
distinct_on_fixture(sess.query(self.table), self.table.c.a),
self.table.c.b,
),
"SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, "
"t.b AS t_b FROM t",
)
def test_legacy_query_distinct_on_columns_subquery(
self, distinct_on_fixture
):
sess = Session()
class Foo:
pass
clear_mappers()
self.mapper_registry.map_imperatively(Foo, self.table)
sess = Session()
subq = sess.query(Foo).subquery()
f1 = aliased(Foo, subq)
self.assert_compile(
distinct_on_fixture(sess.query(f1), f1.a, f1.b),
"SELECT DISTINCT ON (anon_1.a, anon_1.b) anon_1.id "
"AS anon_1_id, anon_1.a AS anon_1_a, anon_1.b "
"AS anon_1_b FROM (SELECT t.id AS id, t.a AS a, "
"t.b AS b FROM t) AS anon_1",
)
def test_legacy_query_distinct_on_aliased(self, distinct_on_fixture):
class Foo:
pass
clear_mappers()
self.mapper_registry.map_imperatively(Foo, self.table)
a1 = aliased(Foo)
sess = Session()
q = distinct_on_fixture(sess.query(a1), a1.a)
self.assert_compile(
q,
"SELECT DISTINCT ON (t_1.a) t_1.id AS t_1_id, "
"t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1",
)
def test_distinct_on_subquery_anon(self, distinct_on_fixture):
sq = select(self.table).alias()
q = distinct_on_fixture(
select(self.table.c.id, sq.c.id), sq.c.id
).where(self.table.c.id == sq.c.id)
self.assert_compile(
q,
"SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id AS id_1 "
"FROM t, (SELECT t.id AS id, t.a AS a, t.b "
"AS b FROM t) AS anon_1 WHERE t.id = anon_1.id",
)
def test_distinct_on_subquery_named(self, distinct_on_fixture):
sq = select(self.table).alias("sq")
q = distinct_on_fixture(
select(self.table.c.id, sq.c.id), sq.c.id
).where(self.table.c.id == sq.c.id)
self.assert_compile(
q,
"SELECT DISTINCT ON (sq.id) t.id, sq.id AS id_1 "
"FROM t, (SELECT t.id AS id, t.a AS a, "
"t.b AS b FROM t) AS sq WHERE t.id = sq.id",
)
@fixtures.CacheKeySuite.run_suite_tests
def test_distinct_on_ext_cache_key(self):
def leg():
with expect_deprecated("Passing expression"):
return self.table.select().distinct(self.table.c.a)
return lambda: [
self.table.select().ext(distinct_on(self.table.c.a)),
self.table.select().ext(distinct_on(self.table.c.b)),
self.table.select().ext(
distinct_on(self.table.c.a, self.table.c.b)
),
self.table.select().ext(
distinct_on(self.table.c.b, self.table.c.a)
),
self.table.select(),
self.table.select().distinct(),
leg(),
]
def test_distinct_on_cache_key_equal(self, distinct_on_fixture):
self._run_cache_key_equal_fixture(
lambda: [
distinct_on_fixture(self.table.select(), self.table.c.a),
distinct_on_fixture(select(self.table), self.table.c.a),
],
compare_values=True,
)
self._run_cache_key_equal_fixture(
lambda: [
distinct_on_fixture(
distinct_on_fixture(self.table.select(), self.table.c.a),
self.table.c.b,
),
distinct_on_fixture(
select(self.table), self.table.c.a, self.table.c.b
),
],
compare_values=True,
)
def test_distinct_on_literal_binds(self, distinct_on_fixture):
self.assert_compile(
distinct_on_fixture(select(self.table), self.table.c.a == 10),
"SELECT DISTINCT ON (t.a = 10) t.id, t.a, t.b FROM t",
literal_binds=True,
)
def test_distinct_on_col_str(self, distinct_on_fixture):
stmt = distinct_on_fixture(select(self.table), "a")
self.assert_compile(
stmt,
"SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t",
dialect="postgresql",
)
def test_distinct_on_label(self, distinct_on_fixture):
stmt = distinct_on_fixture(select(self.table.c.a.label("foo")), "foo")
self.assert_compile(stmt, "SELECT DISTINCT ON (foo) t.a AS foo FROM t")
def test_unresolvable_distinct_label(self, distinct_on_fixture):
stmt = distinct_on_fixture(
select(self.table.c.a.label("foo")), "not a label"
)
with expect_raises_message(
exc.CompileError,
"Can't resolve label reference for.* expression 'not a"
" label' should be explicitly",
):
self.assert_compile(stmt, "ignored")
def test_distinct_on_ext_with_legacy_distinct(self):
with (
expect_raises_message(
exc.InvalidRequestError,
re.escape(
"Cannot mix ``select.ext(distinct_on(...))`` and "
"``select.distinct(...)``"
),
),
expect_deprecated("Passing expression"),
):
s = (
self.table.select()
.distinct(self.table.c.b)
.ext(distinct_on(self.table.c.a))
)
# opposite order is not detected...
with expect_deprecated("Passing expression"):
s = (
self.table.select()
.ext(distinct_on(self.table.c.a))
.distinct(self.table.c.b)
)
# but it raises while compiling
with expect_raises_message(
exc.CompileError,
re.escape(
"Cannot mix ``select.ext(distinct_on(...))`` and "
"``select.distinct(...)``"
),
):
self.assert_compile(s, "ignored")
class FullTextSearchTest(fixtures.TestBase, AssertsCompiledSQL):
"""Tests for full text searching"""
__dialect__ = postgresql.dialect()
def setup_test(self):
self.table = Table(
"t",
MetaData(),
Column("id", Integer, primary_key=True),
Column("title", String),
Column("body", String),
)
self.table_alt = table(
"mytable",
column("id", Integer),
column("title", String(128)),
column("body", String(128)),
)
self.matchtable = Table(
"matchtable",
MetaData(),
Column("id", Integer, primary_key=True),
Column("title", String(200)),
)
def _raise_query(self, q):
"""
useful for debugging. just do...
self._raise_query(q)
"""
c = q.compile(dialect=postgresql.dialect())
raise ValueError(c)
def test_match_custom(self):
s = select(self.table_alt.c.id).where(
func.to_tsquery("fat").bool_op("<->")(func.to_tsquery("rat"))
)
self.assert_compile(
s,
"SELECT mytable.id FROM mytable WHERE"
" to_tsquery(%(to_tsquery_1)s::VARCHAR) <->"
" to_tsquery(%(to_tsquery_2)s::VARCHAR)",
{"to_tsquery_1": "fat", "to_tsquery_2": "rat"},
)
def test_match_custom_regconfig(self):
s = select(self.table_alt.c.id).where(
func.to_tsquery("english", "fat").bool_op("<->")(
func.to_tsquery("english", "rat")
)
)
self.assert_compile(
s,
"SELECT mytable.id FROM mytable WHERE"
" to_tsquery(%(to_tsquery_1)s::REGCONFIG,"
" %(to_tsquery_2)s::VARCHAR) <->"
" to_tsquery(%(to_tsquery_3)s::REGCONFIG,"
" %(to_tsquery_4)s::VARCHAR)",
{
"to_tsquery_1": "english",
"to_tsquery_2": "fat",
"to_tsquery_3": "english",
"to_tsquery_4": "rat",
},
)
def test_match_basic(self):
s = select(self.table_alt.c.id).where(
self.table_alt.c.title.match("somestring")
)
self.assert_compile(
s,
"SELECT mytable.id FROM mytable WHERE mytable.title @@"
" plainto_tsquery(%(title_1)s::VARCHAR)",
)
def test_match_regconfig(self):
s = select(self.table_alt.c.id).where(
self.table_alt.c.title.match(
"somestring", postgresql_regconfig="english"
)
)
self.assert_compile(
s,
"SELECT mytable.id FROM mytable WHERE mytable.title @@"
" plainto_tsquery('english', %(title_1)s::VARCHAR)",
)
def test_match_tsvector(self):
s = select(self.table_alt.c.id).where(
func.to_tsvector(self.table_alt.c.title).match("somestring")
)
self.assert_compile(
s,
"SELECT mytable.id FROM mytable WHERE to_tsvector(mytable.title)"
" @@ plainto_tsquery(%(to_tsvector_1)s::VARCHAR)",
)
def test_match_tsvectorconfig(self):
s = select(self.table_alt.c.id).where(
func.to_tsvector("english", self.table_alt.c.title).match(
"somestring"
)
)
self.assert_compile(
s,
"SELECT mytable.id FROM mytable WHERE"
" to_tsvector(%(to_tsvector_1)s::REGCONFIG, mytable.title) @@"
" plainto_tsquery(%(to_tsvector_2)s::VARCHAR)",
)
def test_match_tsvectorconfig_regconfig(self):
s = select(self.table_alt.c.id).where(
func.to_tsvector("english", self.table_alt.c.title).match(
"somestring", postgresql_regconfig="english"
)
)
self.assert_compile(
s,
"SELECT mytable.id FROM mytable WHERE"
" to_tsvector(%(to_tsvector_1)s::REGCONFIG, mytable.title) @@"
" plainto_tsquery('english', %(to_tsvector_2)s::VARCHAR)",
)
@testing.combinations(
("to_tsvector",),
("to_tsquery",),
("plainto_tsquery",),
("phraseto_tsquery",),
("websearch_to_tsquery",),
("ts_headline",),
argnames="to_ts_name",
)
def test_dont_compile_non_imported(self, to_ts_name):
new_func = type(
to_ts_name,
(GenericFunction,),
{
"_register": False,
"inherit_cache": True,
},
)
with expect_raises_message(
exc.CompileError,
rf"Can't compile \"{to_ts_name}\(\)\" full text search "
f"function construct that does not originate from the "
f'"sqlalchemy.dialects.postgresql" package. '
f'Please ensure "import sqlalchemy.dialects.postgresql" is '
f"called before constructing "
rf"\"sqlalchemy.func.{to_ts_name}\(\)\" to ensure "
f"registration of the correct "
f"argument and return types.",
):
select(new_func("x", "y")).compile(dialect=postgresql.dialect())
@testing.combinations(
(func.to_tsvector,),
(func.to_tsquery,),
(func.plainto_tsquery,),
(func.phraseto_tsquery,),
(func.websearch_to_tsquery,),
argnames="to_ts_func",
)
@testing.variation("use_regconfig", [True, False, "literal"])
def test_to_regconfig_fns(self, to_ts_func, use_regconfig):
"""test #8977"""
matchtable = self.matchtable
fn_name = to_ts_func().name
if use_regconfig.literal:
regconfig = literal("english", REGCONFIG)
elif use_regconfig:
regconfig = "english"
else:
regconfig = None
if regconfig is None:
if fn_name == "to_tsvector":
fn = to_ts_func(matchtable.c.title).match("python")
expected = (
"to_tsvector(matchtable.title) @@ "
"plainto_tsquery($1::VARCHAR)"
)
else:
fn = func.to_tsvector(matchtable.c.title).op("@@")(
to_ts_func("python")
)
expected = (
f"to_tsvector(matchtable.title) @@ {fn_name}($1::VARCHAR)"
)
else:
if fn_name == "to_tsvector":
fn = to_ts_func(regconfig, matchtable.c.title).match("python")
expected = (
"to_tsvector($1::REGCONFIG, matchtable.title) @@ "
"plainto_tsquery($2::VARCHAR)"
)
else:
fn = func.to_tsvector(matchtable.c.title).op("@@")(
to_ts_func(regconfig, "python")
)
expected = (
"to_tsvector(matchtable.title) @@ "
f"{fn_name}($1::REGCONFIG, $2::VARCHAR)"
)
stmt = matchtable.select().where(fn)
self.assert_compile(
stmt,
"SELECT matchtable.id, matchtable.title "
f"FROM matchtable WHERE {expected}",
dialect="postgresql+asyncpg",
)
@testing.variation("use_regconfig", [True, False, "literal"])
@testing.variation("include_options", [True, False])
@testing.variation("tsquery_in_expr", [True, False])
def test_ts_headline(
self, connection, use_regconfig, include_options, tsquery_in_expr
):
"""test #8977"""
if use_regconfig.literal:
regconfig = literal("english", REGCONFIG)
elif use_regconfig:
regconfig = "english"
else:
regconfig = None
text = (
"The most common type of search is to find all documents "
"containing given query terms and return them in order of "
"their similarity to the query."
)
tsquery = func.to_tsquery("english", "query & similarity")
if regconfig is None:
tsquery_str = "to_tsquery($2::REGCONFIG, $3::VARCHAR)"
else:
tsquery_str = "to_tsquery($3::REGCONFIG, $4::VARCHAR)"
if tsquery_in_expr:
tsquery = case((true(), tsquery), else_=null())
tsquery_str = f"CASE WHEN true THEN {tsquery_str} ELSE NULL END"
is_(tsquery.type._type_affinity, TSQUERY)
args = [text, tsquery]
if regconfig is not None:
args.insert(0, regconfig)
if include_options:
args.append(
"MaxFragments=10, MaxWords=7, "
"MinWords=3, StartSel=<<, StopSel=>>"
)
fn = func.ts_headline(*args)
stmt = select(fn)
if regconfig is None and not include_options:
self.assert_compile(
stmt,
"SELECT ts_headline($1::VARCHAR, "
f"{tsquery_str}) AS ts_headline_1",
dialect="postgresql+asyncpg",
)
elif regconfig is None and include_options:
self.assert_compile(
stmt,
"SELECT ts_headline($1::VARCHAR, "
f"{tsquery_str}, $4::VARCHAR) AS ts_headline_1",
dialect="postgresql+asyncpg",
)
elif regconfig is not None and not include_options:
self.assert_compile(
stmt,
"SELECT ts_headline($1::REGCONFIG, $2::VARCHAR, "
f"{tsquery_str}) AS ts_headline_1",
dialect="postgresql+asyncpg",
)
else:
self.assert_compile(
stmt,
"SELECT ts_headline($1::REGCONFIG, $2::VARCHAR, "
f"{tsquery_str}, $5::VARCHAR) "
"AS ts_headline_1",
dialect="postgresql+asyncpg",
)
class RegexpTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = "postgresql"
def setup_test(self):
self.table = table(
"mytable", column("myid", String), column("name", String)
)
def test_regexp_match(self):
self.assert_compile(
self.table.c.myid.regexp_match("pattern"),
"mytable.myid ~ %(myid_1)s::VARCHAR",
checkparams={"myid_1": "pattern"},
)
def test_regexp_match_column(self):
self.assert_compile(
self.table.c.myid.regexp_match(self.table.c.name),
"mytable.myid ~ mytable.name",
checkparams={},
)
def test_regexp_match_str(self):
self.assert_compile(
literal("string").regexp_match(self.table.c.name),
"%(param_1)s::VARCHAR ~ mytable.name",
checkparams={"param_1": "string"},
)
def test_regexp_match_flags(self):
self.assert_compile(
self.table.c.myid.regexp_match("pattern", flags="ig"),
"mytable.myid ~ CONCAT('(?', 'ig', ')', %(myid_1)s::VARCHAR)",
checkparams={"myid_1": "pattern"},
)
def test_regexp_match_flags_ignorecase(self):
self.assert_compile(
self.table.c.myid.regexp_match("pattern", flags="i"),
"mytable.myid ~* %(myid_1)s::VARCHAR",
checkparams={"myid_1": "pattern"},
)
def test_not_regexp_match(self):
self.assert_compile(
~self.table.c.myid.regexp_match("pattern"),
"mytable.myid !~ %(myid_1)s::VARCHAR",
checkparams={"myid_1": "pattern"},
)
def test_not_regexp_match_column(self):
self.assert_compile(
~self.table.c.myid.regexp_match(self.table.c.name),
"mytable.myid !~ mytable.name",
checkparams={},
)
def test_not_regexp_match_str(self):
self.assert_compile(
~literal("string").regexp_match(self.table.c.name),
"%(param_1)s::VARCHAR !~ mytable.name",
checkparams={"param_1": "string"},
)
def test_not_regexp_match_flags(self):
self.assert_compile(
~self.table.c.myid.regexp_match("pattern", flags="ig"),
"mytable.myid !~ CONCAT('(?', 'ig', ')', %(myid_1)s::VARCHAR)",
checkparams={"myid_1": "pattern"},
)
def test_not_regexp_match_flags_ignorecase(self):
self.assert_compile(
~self.table.c.myid.regexp_match("pattern", flags="i"),
"mytable.myid !~* %(myid_1)s::VARCHAR",
checkparams={"myid_1": "pattern"},
)
def test_regexp_replace(self):
self.assert_compile(
self.table.c.myid.regexp_replace("pattern", "replacement"),
"REGEXP_REPLACE(mytable.myid, %(myid_1)s::VARCHAR,"
" %(myid_2)s::VARCHAR)",
checkparams={"myid_1": "pattern", "myid_2": "replacement"},
)
def test_regexp_replace_column(self):
self.assert_compile(
self.table.c.myid.regexp_replace("pattern", self.table.c.name),
"REGEXP_REPLACE(mytable.myid, %(myid_1)s::VARCHAR, mytable.name)",
checkparams={"myid_1": "pattern"},
)
def test_regexp_replace_column2(self):
self.assert_compile(
self.table.c.myid.regexp_replace(self.table.c.name, "replacement"),
"REGEXP_REPLACE(mytable.myid, mytable.name, %(myid_1)s::VARCHAR)",
checkparams={"myid_1": "replacement"},
)
def test_regexp_replace_string(self):
self.assert_compile(
literal("string").regexp_replace("pattern", self.table.c.name),
"REGEXP_REPLACE(%(param_1)s::VARCHAR, %(param_2)s::VARCHAR,"
" mytable.name)",
checkparams={"param_2": "pattern", "param_1": "string"},
)
def test_regexp_replace_flags(self):
self.assert_compile(
self.table.c.myid.regexp_replace(
"pattern", "replacement", flags="ig"
),
"REGEXP_REPLACE(mytable.myid, %(myid_1)s::VARCHAR,"
" %(myid_2)s::VARCHAR, 'ig')",
checkparams={"myid_1": "pattern", "myid_2": "replacement"},
)
def test_regexp_replace_flags_safestring(self):
self.assert_compile(
self.table.c.myid.regexp_replace(
"pattern", "replacement", flags="i'g"
),
"REGEXP_REPLACE(mytable.myid, %(myid_1)s::VARCHAR,"
" %(myid_2)s::VARCHAR, 'i''g')",
checkparams={"myid_1": "pattern", "myid_2": "replacement"},
)
@testing.combinations(
(
5,
10,
{},
(
"OFFSET (%(param_1)s::INTEGER) ROWS FETCH FIRST"
" (%(param_2)s::INTEGER) ROWS ONLY"
),
{"param_1": 10, "param_2": 5},
),
(
None,
10,
{},
"LIMIT ALL OFFSET %(param_1)s::INTEGER",
{"param_1": 10},
),
(
5,
None,
{},
"FETCH FIRST (%(param_1)s::INTEGER) ROWS ONLY",
{"param_1": 5},
),
(
0,
0,
{},
(
"OFFSET (%(param_1)s::INTEGER) ROWS FETCH FIRST"
" (%(param_2)s::INTEGER) ROWS ONLY"
),
{"param_1": 0, "param_2": 0},
),
(
5,
10,
{"percent": True},
(
"OFFSET (%(param_1)s::INTEGER) ROWS FETCH FIRST "
"(%(param_2)s::INTEGER) PERCENT ROWS ONLY"
),
{"param_1": 10, "param_2": 5},
),
(
5,
10,
{"percent": True, "with_ties": True},
(
"OFFSET (%(param_1)s::INTEGER) ROWS FETCH FIRST"
" (%(param_2)s::INTEGER) PERCENT ROWS WITH TIES"
),
{"param_1": 10, "param_2": 5},
),
(
5,
10,
{"with_ties": True},
(
"OFFSET (%(param_1)s::INTEGER) ROWS FETCH FIRST "
"(%(param_2)s::INTEGER) ROWS WITH TIES"
),
{"param_1": 10, "param_2": 5},
),
(
literal_column("Q"),
literal_column("Y"),
{},
"OFFSET (Y) ROWS FETCH FIRST (Q) ROWS ONLY",
{},
),
(
column("Q"),
column("Y"),
{},
'OFFSET ("Y") ROWS FETCH FIRST ("Q") ROWS ONLY',
{},
),
(
bindparam("Q", 3),
bindparam("Y", 7),
{},
(
"OFFSET (%(Y)s::INTEGER) ROWS FETCH FIRST (%(Q)s::INTEGER)"
" ROWS ONLY"
),
{"Q": 3, "Y": 7},
),
(
literal_column("Q") + literal_column("Z"),
literal_column("Y") + literal_column("W"),
{},
"OFFSET (Y + W) ROWS FETCH FIRST (Q + Z) ROWS ONLY",
{},
),
)
def test_fetch(self, fetch, offset, fetch_kw, exp, params):
self.assert_compile(
select(1).fetch(fetch, **fetch_kw).offset(offset),
"SELECT 1 " + exp,
checkparams=params,
)
class CacheKeyTest(fixtures.CacheKeyFixture, fixtures.TestBase):
def test_aggregate_order_by(self):
"""test #8574"""
self._run_cache_key_fixture(
lambda: (
aggregate_order_by(column("a"), column("a")),
aggregate_order_by(column("a"), column("b")),
aggregate_order_by(column("a"), column("a").desc()),
aggregate_order_by(column("a"), column("a").nulls_first()),
aggregate_order_by(
column("a"), column("a").desc().nulls_first()
),
aggregate_order_by(column("a", Integer), column("b")),
aggregate_order_by(column("a"), column("b"), column("c")),
aggregate_order_by(column("a"), column("c"), column("b")),
aggregate_order_by(
column("a"), column("b").desc(), column("c")
),
aggregate_order_by(
column("a"), column("b").nulls_first(), column("c")
),
aggregate_order_by(
column("a"), column("b").desc().nulls_first(), column("c")
),
aggregate_order_by(
column("a", Integer), column("a"), column("b")
),
),
compare_values=False,
)
def test_array_equivalent_keys_one_element(self):
self._run_cache_key_equal_fixture(
lambda: (
array([random.randint(0, 10)]),
array([random.randint(0, 10)], type_=Integer),
array([random.randint(0, 10)], type_=Integer),
),
compare_values=False,
)
def test_array_equivalent_keys_two_elements(self):
self._run_cache_key_equal_fixture(
lambda: (
array([random.randint(0, 10), random.randint(0, 10)]),
array(
[random.randint(0, 10), random.randint(0, 10)],
type_=Integer,
),
array(
[random.randint(0, 10), random.randint(0, 10)],
type_=Integer,
),
),
compare_values=False,
)
def test_array_heterogeneous(self):
self._run_cache_key_fixture(
lambda: (
array([], type_=Integer),
array([], type_=Text),
array([]),
array([random.choice(["t1", "t2", "t3"])]),
array(
[
random.choice(["t1", "t2", "t3"]),
random.choice(["t1", "t2", "t3"]),
]
),
array([random.choice(["t1", "t2", "t3"])], type_=Text),
array([random.choice(["t1", "t2", "t3"])], type_=VARCHAR(30)),
array([random.randint(0, 10), random.randint(0, 10)]),
),
compare_values=False,
)