Files
sqlalchemy/test/sql/test_create_table_as.py
Greg Jarzab 7f5e9e733c Support for Create Table As
Added support for the SQL ``CREATE TABLE ... AS SELECT`` construct via the
new :class:`_sql.CreateTableAs` DDL construct and the
:meth:`_sql.SelectBase.into` method. The new construct allows creating a
table directly from the results of a SELECT statement, with support for
options such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the
target database.  Pull request courtesy Greg Jarzab.

Fixes: #4950
Closes: #12860
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/12860
Pull-request-sha: 7de8a109b8

Change-Id: Id9c8e4a3c520ffc61de1e48e331b6220e3d52fc9
2025-10-25 14:24:46 -04:00

358 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
from sqlalchemy import bindparam
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy.exc import ArgumentError
from sqlalchemy.schema import CreateTable
from sqlalchemy.sql import column
from sqlalchemy.sql import select
from sqlalchemy.sql import table
from sqlalchemy.sql.ddl import CreateTableAs
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing.assertions import AssertsCompiledSQL
from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.assertions import expect_warnings
class CreateTableAsDefaultDialectTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
@testing.fixture
def src_table(self):
return Table(
"src",
MetaData(),
Column("id", Integer),
Column("name", String(50)),
)
@testing.fixture
def src_two_tables(self):
a = table("a", column("id"), column("name"))
b = table("b", column("id"), column("status"))
return a, b
def test_basic_element(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id, src.c.name).select_from(src),
"dst",
)
self.assert_compile(
stmt,
"CREATE TABLE dst AS SELECT src.id, src.name FROM src",
)
def test_schema_element_qualified(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id).select_from(src),
"dst",
schema="analytics",
)
self.assert_compile(
stmt,
"CREATE TABLE analytics.dst AS SELECT src.id FROM src",
)
def test_blank_schema_treated_as_none(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id).select_from(src), "dst", schema=""
)
self.assert_compile(stmt, "CREATE TABLE dst AS SELECT src.id FROM src")
def test_binds_rendered_inline(self, src_table):
src = src_table
stmt = CreateTableAs(
select(literal("x").label("tag")).select_from(src),
"dst",
)
self.assert_compile(
stmt,
"CREATE TABLE dst AS SELECT 'x' AS tag FROM src",
)
def test_temporary_no_schema(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id, src.c.name).select_from(src),
"dst",
temporary=True,
)
self.assert_compile(
stmt,
"CREATE TEMPORARY TABLE dst AS "
"SELECT src.id, src.name FROM src",
)
def test_temporary_exists_flags(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id).select_from(src),
"dst",
schema="sch",
temporary=True,
if_not_exists=True,
)
self.assert_compile(
stmt,
"CREATE TEMPORARY TABLE "
"IF NOT EXISTS sch.dst AS SELECT src.id FROM src",
)
def test_if_not_exists(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id, src.c.name).select_from(src),
"dst",
if_not_exists=True,
)
self.assert_compile(
stmt,
"CREATE TABLE IF NOT EXISTS dst AS "
"SELECT src.id, src.name FROM src",
)
def test_join_with_binds_rendered_inline(self, src_two_tables):
a, b = src_two_tables
s = (
select(a.c.id, a.c.name)
.select_from(a.join(b, a.c.id == b.c.id))
.where(b.c.status == "active")
).into("dst")
# Ensure WHERE survives into CTAS and binds are rendered inline
self.assert_compile(
s,
"CREATE TABLE dst AS "
"SELECT a.id, a.name FROM a JOIN b ON a.id = b.id "
"WHERE b.status = 'active'",
)
def test_into_equivalent_to_element(self, src_table):
src = src_table
s = select(src.c.id).select_from(src).where(src.c.id == 2)
via_into = s.into("dst")
via_element = CreateTableAs(s, "dst")
self.assert_compile(
via_into,
"CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2",
)
self.assert_compile(
via_element,
"CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2",
)
def test_into_does_not_mutate_original_select(self, src_table):
src = src_table
s = select(src.c.id).select_from(src).where(src.c.id == 5)
# compile original SELECT
self.assert_compile(
s,
"SELECT src.id FROM src WHERE src.id = :id_1",
)
# build CTAS
_ = s.into("dst")
# original is still a SELECT
self.assert_compile(
s,
"SELECT src.id FROM src WHERE src.id = :id_1",
)
def test_into_with_schema_argument(self, src_table):
src = src_table
s = select(src.c.id).select_from(src).into("t", schema="analytics")
self.assert_compile(
s,
"CREATE TABLE analytics.t AS SELECT src.id FROM src",
)
def test_target_string_must_be_unqualified(self, src_table):
src = src_table
with expect_raises_message(
ArgumentError,
re.escape("Target string must be unqualified (use schema=)."),
):
CreateTableAs(select(src.c.id).select_from(src), "sch.dst")
def test_empty_name(self):
with expect_raises_message(
ArgumentError, "Table name must be non-empty"
):
CreateTableAs(select(literal(1)), "")
@testing.variation("provide_metadata", [True, False])
def test_generated_metadata_table_property(
self, src_table, provide_metadata
):
src = src_table
if provide_metadata:
metadata = MetaData()
else:
metadata = None
stmt = CreateTableAs(
select(src.c.name.label("thename"), src.c.id).select_from(src),
"dst",
schema="sch",
metadata=metadata,
)
if metadata is not None:
is_(stmt.metadata, metadata)
assert isinstance(stmt.table, Table)
is_(stmt.table.metadata, stmt.metadata)
self.assert_compile(
CreateTable(stmt.table),
"CREATE TABLE sch.dst (thename VARCHAR(50), id INTEGER)",
)
def test_labels_in_select_list_preserved(self, src_table):
src = src_table
stmt = CreateTableAs(
select(
src.c.id.label("user_id"), src.c.name.label("user_name")
).select_from(src),
"dst",
)
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT src.id AS user_id, src.name AS user_name FROM src",
)
def test_distinct_and_group_by_survive(self, src_table):
src = src_table
sel = (
select(src.c.name).select_from(src).distinct().group_by(src.c.name)
)
stmt = CreateTableAs(sel, "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT DISTINCT src.name FROM src GROUP BY src.name",
)
def test_bindparam_no_value_raises(self, src_table):
src = src_table
sel = select(src.c.name).where(src.c.name == bindparam("x"))
stmt = CreateTableAs(sel, "dst")
with expect_warnings(
"Bound parameter 'x' rendering literal NULL in a SQL expression;"
):
self.assert_compile(
stmt,
"CREATE TABLE dst AS SELECT src.name FROM src "
"WHERE src.name = NULL",
)
def test_union_all_with_binds_rendered_inline(self, src_two_tables):
a, b = src_two_tables
# Named binds so params are deterministic
s1 = (
select(a.c.id)
.select_from(a)
.where(a.c.id == bindparam("p_a", value=1))
)
s2 = (
select(b.c.id)
.select_from(b)
.where(b.c.id == bindparam("p_b", value=2))
)
u_all = s1.union_all(s2)
stmt = CreateTableAs(u_all, "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT a.id FROM a WHERE a.id = 1 "
"UNION ALL SELECT b.id FROM b WHERE b.id = 2",
)
def test_union_labels_follow_first_select(self, src_two_tables):
# Many engines take column names
# of a UNION from the first SELECTs labels.
a = table("a", column("val"))
b = table("b", column("val"))
s1 = select(a.c.val.label("first_name")).select_from(a)
s2 = select(b.c.val).select_from(b) # unlabeled second branch
u = s1.union(s2)
stmt = CreateTableAs(u, "dst")
# We only assert whats stable across dialects:
# - first SELECT has the label
# - a UNION occurs
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT a.val AS first_name FROM a "
"UNION "
"SELECT b.val FROM b",
)
def test_union_all_with_inlined_literals_smoke(self, src_two_tables):
# Proves literal_binds=True behavior applies across branches.
a, b = src_two_tables
u = (
select(literal(1).label("x"))
.select_from(a)
.union_all(select(literal("b").label("x")).select_from(b))
)
stmt = CreateTableAs(u, "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT 1 AS x FROM a UNION ALL SELECT 'b' AS x FROM b",
)
def test_select_shape_where_order_limit(self, src_table):
src = src_table
sel = (
select(src.c.id, src.c.name)
.select_from(src)
.where(src.c.id > literal(10))
.order_by(src.c.name)
.limit(5)
.offset(0)
)
stmt = CreateTableAs(sel, "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT src.id, src.name FROM src "
"WHERE src.id > 10 ORDER BY src.name LIMIT 5 OFFSET 0",
)
def test_cte_smoke(self, src_two_tables):
# Proves CTAS works with a WITH-CTE wrapper and labeled column.
a, _ = src_two_tables
cte = select(a.c.id.label("aid")).select_from(a).cte("u")
stmt = CreateTableAs(select(cte.c.aid), "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"WITH u AS (SELECT a.id AS aid FROM a) "
"SELECT u.aid FROM u",
)