Merge "Support for Create Table As" into main

This commit is contained in:
Michael Bayer
2025-10-25 20:18:32 +00:00
committed by Gerrit Code Review
18 changed files with 1284 additions and 2 deletions
+47
View File
@@ -618,6 +618,53 @@ not the database portion::
:ticket:`11234`
.. _change_4950:
CREATE TABLE AS SELECT Support
-------------------------------
SQLAlchemy 2.1 adds support for the SQL ``CREATE TABLE ... AS SELECT``
construct as well as the ``SELECT ... INTO`` variant for selected backends,
which creates a new table directly from the results of a SELECT
statement. This is available via the new :class:`_schema.CreateTableAs` DDL
construct and the :meth:`_sql.SelectBase.into` convenience method.
The :class:`_schema.CreateTableAs` construct can be used to create a new table
from any SELECT statement::
>>> from sqlalchemy import select, CreateTableAs
>>> select_stmt = select(users.c.id, users.c.name).where(users.c.status == "active")
>>> create_table_as = CreateTableAs(select_stmt, "active_users")
The above construct renders as a ``CREATE TABLE AS`` statement::
>>> print(create_table_as)
CREATE TABLE active_users AS SELECT users.id, users.name
FROM users
WHERE users.status = 'active'
The construct can be executed to emit the above DDL, and the table may then
be accessed using the :attr:`.CreateTableAs.table` attribute which
supplies a :class:`.Table`::
>>> print(select(create_table_as.table))
SELECT users.id, users.name
FROM active_users
See :ref:`tutorial_create_table_as` for a tutorial.
.. seealso::
:ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial`
:class:`_schema.CreateTableAs` - DDL construct for CREATE TABLE AS
:meth:`_sql.SelectBase.into` - convenience method on SELECT and UNION
statements
:ticket:`4950`
.. _change_11250:
Potential breaking change to odbc_connect= handling for mssql+pyodbc
+16
View File
@@ -0,0 +1,16 @@
.. change::
:tags: feature, sql
:tickets: 4950
Added support for the SQL ``CREATE TABLE ... AS SELECT`` construct via the
new :class:`_schema.CreateTableAs` DDL construct and the
:meth:`_sql.Select.into` method. The new construct allows creating a
table directly from the results of a SELECT statement, with support for
options such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the
target database. Pull request courtesy Greg Jarzab.
.. seealso::
:ref:`change_4950`
+2
View File
@@ -329,6 +329,8 @@ DDL Expression Constructs API
.. autoclass:: CreateTable
:members:
.. autoclass:: CreateTableAs
:members:
.. autoclass:: DropTable
:members:
+104
View File
@@ -1818,6 +1818,110 @@ where it is usable for custom SQL functions::
:ref:`postgresql_column_valued` - in the :ref:`postgresql_toplevel` documentation.
.. _tutorial_create_table_as:
Using CREATE TABLE AS / SELECT INTO with :func:`.select`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. versionadded:: 2.1
The :class:`.CreateTableAs` construct, along with a complementing method
:meth:`.Select.into`, provides support for the "CREATE TABLE AS" / "SELECT INTO"
DDL constructs, which allows the creation of new tables in the database that
represent the contents of an arbitrary SELECT statement. This SQL syntax
is supported by all included SQLAlchemy backends.
We can produce a :class:`_schema.CreateTableAs` expression from a
:func:`_sql.select` created against any combinations of tables::
>>> from sqlalchemy import select, CreateTableAs
>>> select_stmt = select(User.id, User.name).where(User.name.like("sponge%"))
>>> create_table_as = CreateTableAs(select_stmt, "spongebob_users")
We can also use the equivalent :meth:`.Select.into` method::
>>> create_table_as = select_stmt.into("spongebob_users")
Stringifying this construct on most backends illustrates the ``CREATE TABLE AS`` syntax::
>>> print(create_table_as)
CREATE TABLE spongebob_users AS SELECT user_account.id, user_account.name
FROM user_account
WHERE user_account.name LIKE 'sponge%'
On Microsoft SQL Server, we observe that SELECT INTO is generated instead::
>>> from sqlalchemy.dialects import mssql
>>> print(create_table_as.compile(dialect=mssql.dialect()))
SELECT user_account.id, user_account.name INTO spongebob_users
FROM user_account
WHERE user_account.name LIKE 'sponge%'
We can invoke the :class:`.CreateTableAs` construct directly on a database
connection to create the new table in the database::
>>> session.execute(create_table_as)
{execsql}BEGIN (implicit)
CREATE TABLE spongebob_users AS SELECT user_account.id, user_account.name
FROM user_account
WHERE user_account.name LIKE 'sponge%'
[...] ()
{stop}<sqlalchemy.engine.cursor.CursorResult object at ...>
The database now has a new table ``spongebob_users`` which contains all the columns and rows
that would be returned by the SELECT statement. This is a real table
in the database that will remain until we drop it (unless it's a temporary
table that automatically drops, or if transactional DDL is rolled back).
To use the new table with SQLAlchemy Core expressions, we can access a
new :class:`.Table` via the :attr:`.CreateTableAs.table` attribute; this
:class:`.Table` is by default associated with a newly created
:class:`.MetaData` object local to the :class:`.CreateTableAs` object:
.. sourcecode:: pycon+sql
>>> select_stmt = select(create_table_as.table)
>>> result = session.execute(select_stmt)
{execsql}SELECT spongebob_users.id, spongebob_users.name
FROM spongebob_users
[...] ()
{stop}>>> result.all()
{execsql}[(1, 'spongebob')]
To emit DROP for this table, we use :meth:`.Table.drop`::
>>> create_table_as.table.drop(session.connection())
{execsql}DROP TABLE spongebob_users
[...] ()
Alternatively, we can associate the :class:`.CreateTableAs` with an existing
:class:`.MetaData` using the :paramref:`.CreateTableAs.metadata` parameter, in
which case operations like :meth:`.MetaData.drop_all` will include a DROP for
this table.
.. note:: The :class:`.CreateTableAs` construct is not currently included in the
sequence initiated by :meth:`.MetaData.create_all`, meaning that this
operation would emit a simple ``CREATE TABLE`` for the table, rather than
using ``CREATE TABLE AS`` or ``SELECT INTO``, which would omit the
``SELECT`` statement; so when associating
:class:`.CreateTableAs` with an existing :class:`.MetaData`, be sure to
ensure that :meth:`.MetaData.create_all` is not called on that :class:`.MetaData`
unless the :class:`.CreateTableAs` construct were already invoked for that
database, assuring the table already exists.
:class:`.CreateTableAs` and :meth:`.Select.into` both support optional flags
such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the target
database::
>>> # Create a temporary table with IF NOT EXISTS
>>> stmt = select(User.id, User.name).into(
... "temp_snapshot", temporary=True, if_not_exists=True
... )
>>> print(stmt)
CREATE TEMPORARY TABLE IF NOT EXISTS temp_snapshot AS SELECT user_account.id, user_account.name
FROM user_account
.. _tutorial_casts:
Data Casts and Type Coercion
+1
View File
@@ -62,6 +62,7 @@ from .schema import Column as Column
from .schema import ColumnDefault as ColumnDefault
from .schema import Computed as Computed
from .schema import Constraint as Constraint
from .schema import CreateTableAs as CreateTableAs
from .schema import DDL as DDL
from .schema import DDLElement as DDLElement
from .schema import DefaultClause as DefaultClause
+34
View File
@@ -2694,6 +2694,40 @@ class MSDDLCompiler(compiler.DDLCompiler):
self.preparer.format_table(drop.element.table),
)
def visit_create_table_as(self, element, **kw):
prep = self.preparer
# SQL Server doesn't support CREATE TABLE AS, use SELECT INTO instead
# Format: SELECT columns INTO new_table FROM source WHERE ...
qualified = prep.format_table(element.table)
# Get the inner SELECT SQL
inner_kw = dict(kw)
inner_kw["literal_binds"] = True
select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
# Inject INTO clause before FROM keyword
# Find FROM position (case-insensitive)
select_upper = select_sql.upper()
from_idx = select_upper.find(" FROM ")
if from_idx == -1:
from_idx = select_upper.find("\nFROM ")
if from_idx == -1:
raise exc.CompileError(
"Could not find FROM keyword in selectable for CREATE TABLE AS"
)
# Insert INTO clause before FROM
result = (
select_sql[:from_idx]
+ f"INTO {qualified} "
+ select_sql[from_idx:]
)
return result
def visit_primary_key_constraint(self, constraint, **kw):
if len(constraint) == 0:
return ""
+1
View File
@@ -20,6 +20,7 @@ from .sql.ddl import CreateIndex as CreateIndex
from .sql.ddl import CreateSchema as CreateSchema
from .sql.ddl import CreateSequence as CreateSequence
from .sql.ddl import CreateTable as CreateTable
from .sql.ddl import CreateTableAs as CreateTableAs
from .sql.ddl import DDL as DDL
from .sql.ddl import DDLElement as DDLElement
from .sql.ddl import DropColumnComment as DropColumnComment
+19
View File
@@ -94,6 +94,7 @@ if typing.TYPE_CHECKING:
from .base import CompileState
from .base import Executable
from .cache_key import CacheKey
from .ddl import CreateTableAs
from .ddl import ExecutableDDLElement
from .dml import Delete
from .dml import Insert
@@ -6927,6 +6928,24 @@ class DDLCompiler(Compiled):
text += "\n)%s\n\n" % self.post_create_table(table)
return text
def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str:
prep = self.preparer
inner_kw = dict(kw)
inner_kw["literal_binds"] = True
select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
parts = [
"CREATE",
"TEMPORARY" if element.temporary else None,
"TABLE",
"IF NOT EXISTS" if element.if_not_exists else None,
prep.format_table(element.table),
"AS",
select_sql,
]
return " ".join(p for p in parts if p)
def visit_create_column(self, create, first_pk=False, **kw):
column = create.element
+150 -1
View File
@@ -29,11 +29,14 @@ from typing import Tuple
from typing import TypeVar
from typing import Union
from . import coercions
from . import roles
from .base import _generative
from .base import Executable
from .base import SchemaVisitor
from .elements import ClauseElement
from .selectable import SelectBase
from .selectable import TableClause
from .. import exc
from .. import util
from ..util import topological
@@ -47,10 +50,10 @@ if typing.TYPE_CHECKING:
from .schema import Constraint
from .schema import ForeignKeyConstraint
from .schema import Index
from .schema import MetaData
from .schema import SchemaItem
from .schema import Sequence as Sequence # noqa: F401
from .schema import Table
from .selectable import TableClause
from ..engine.base import Connection
from ..engine.interfaces import CacheStats
from ..engine.interfaces import CompiledCacheType
@@ -546,6 +549,152 @@ class CreateTable(_CreateBase["Table"]):
self.include_foreign_key_constraints = include_foreign_key_constraints
class CreateTableAs(ExecutableDDLElement):
"""Represent a CREATE TABLE ... AS statement.
This creates a new table directly from the output of a SELECT.
The set of columns in the new table is derived from the
SELECT list; constraints, indexes, and defaults are not copied.
E.g.::
from sqlalchemy import select
from sqlalchemy.sql.ddl import CreateTableAs
# Create a new table from a SELECT
stmt = CreateTableAs(
select(users.c.id, users.c.name).where(users.c.status == "active"),
"active_users",
)
with engine.begin() as conn:
conn.execute(stmt)
# With optional flags
stmt = CreateTableAs(
select(users.c.id, users.c.name),
"temp_snapshot",
temporary=True,
if_not_exists=True,
)
The generated table object can be accessed via the :attr:`.table` property,
which will be an instance of :class:`.Table`; by default this is associated
with a local :class:`.MetaData` construct::
stmt = CreateTableAs(select(users.c.id, users.c.name), "active_users")
active_users_table = stmt.table
To associate the :class:`.Table` with an existing :class:`.MetaData`,
use the :paramref:`_schema.CreateTableAs.metadata` parameter::
stmt = CreateTableAs(
select(users.c.id, users.c.name),
"active_users",
metadata=some_metadata,
)
active_users_table = stmt.table
.. versionadded:: 2.1
:param selectable: :class:`_sql.Select`
The SELECT statement providing the columns and rows.
:param table_name: str
Table name as a string. Must be unqualified; use the ``schema``
argument for qualification.
:param metadata: :class:`_schema.MetaData`, optional
If provided, the :class:`_schema.Table` object available via the
:attr:`.table` attribute will be associated with this
:class:`.MetaData`. Otherwise, a new, empty :class:`.MetaData`
is created.
:param schema: str, optional schema or owner name.
:param temporary: bool, default False.
If True, render ``TEMPORARY``
:param if_not_exists: bool, default False.
If True, render ``IF NOT EXISTS``
.. seealso::
:ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial`
:meth:`_sql.SelectBase.into` - convenience method to create a
:class:`_schema.CreateTableAs` from a SELECT statement
"""
__visit_name__ = "create_table_as"
inherit_cache = False
table: Table
""":class:`.Table` object representing the table that this
:class:`.CreateTableAs` would generate when executed."""
def __init__(
self,
selectable: SelectBase,
table_name: str,
*,
metadata: Optional["MetaData"] = None,
schema: Optional[str] = None,
temporary: bool = False,
if_not_exists: bool = False,
):
# Coerce selectable to a Select statement
selectable = coercions.expect(roles.DMLSelectRole, selectable)
if isinstance(table_name, str):
if not table_name:
raise exc.ArgumentError("Table name must be non-empty")
if "." in table_name:
raise exc.ArgumentError(
"Target string must be unqualified (use schema=)."
)
self.schema = schema
self.selectable = selectable
self.temporary = bool(temporary)
self.if_not_exists = bool(if_not_exists)
self.metadata = metadata
self.table_name = table_name
self._gen_table()
@util.preload_module("sqlalchemy.sql.schema")
def _gen_table(self):
MetaData = util.preloaded.sql_schema.MetaData
Column = util.preloaded.sql_schema.Column
Table = util.preloaded.sql_schema.Table
MetaData = util.preloaded.sql_schema.MetaData
column_name_type_pairs = (
(name, col_element.type)
for _, name, _, col_element, _ in (
self.selectable._generate_columns_plus_names(
anon_for_dupe_key=False
)
)
)
if self.metadata is None:
self.metadata = metadata = MetaData()
else:
metadata = self.metadata
self.table = Table(
self.table_name,
metadata,
*(Column(name, typ) for name, typ in column_name_type_pairs),
schema=self.schema,
)
class _DropView(_DropBase["Table"]):
"""Semi-public 'DROP VIEW' construct.
+80
View File
@@ -138,6 +138,7 @@ if TYPE_CHECKING:
from .base import ReadOnlyColumnCollection
from .cache_key import _CacheKeyTraversalType
from .compiler import SQLCompiler
from .ddl import CreateTableAs
from .dml import Delete
from .dml import Update
from .elements import BinaryExpression
@@ -148,6 +149,7 @@ if TYPE_CHECKING:
from .functions import Function
from .schema import ForeignKey
from .schema import ForeignKeyConstraint
from .schema import MetaData
from .sqltypes import TableValueType
from .type_api import TypeEngine
from .visitors import _CloneCallableType
@@ -3796,6 +3798,84 @@ class SelectBase(
self._ensure_disambiguated_names(), name=name
)
@util.preload_module("sqlalchemy.sql.ddl")
def into(
self,
target: str,
*,
metadata: Optional["MetaData"] = None,
schema: Optional[str] = None,
temporary: bool = False,
if_not_exists: bool = False,
) -> CreateTableAs:
"""Create a :class:`_schema.CreateTableAs` construct from this SELECT.
This method provides a convenient way to create a ``CREATE TABLE ...
AS`` statement from a SELECT, as well as compound SELECTs like UNION.
The new table will be created with columns matching the SELECT list.
Supported on all included backends, the construct emits
``CREATE TABLE...AS`` for all backends except SQL Server, which instead
emits a ``SELECT..INTO`` statement.
e.g.::
from sqlalchemy import select
# Create a new table from a SELECT
stmt = (
select(users.c.id, users.c.name)
.where(users.c.status == "active")
.into("active_users")
)
with engine.begin() as conn:
conn.execute(stmt)
# With optional flags
stmt = (
select(users.c.id)
.where(users.c.status == "inactive")
.into("inactive_users", schema="analytics", if_not_exists=True)
)
.. versionadded:: 2.1
:param target: Name of the table to create as a string. Must be
unqualified; use the ``schema`` parameter for qualification.
:param metadata: :class:`_schema.MetaData`, optional
If provided, the :class:`_schema.Table` object available via the
:attr:`.table` attribute will be associated with this
:class:`.MetaData`. Otherwise, a new, empty :class:`.MetaData`
is created.
:param schema: Optional schema name for the new table.
:param temporary: If True, create a temporary table where supported
:param if_not_exists: If True, add IF NOT EXISTS clause where supported
:return: A :class:`_schema.CreateTableAs` construct.
.. seealso::
:ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial`
:class:`_schema.CreateTableAs`
"""
sql_ddl = util.preloaded.sql_ddl
return sql_ddl.CreateTableAs(
self,
target,
metadata=metadata,
schema=schema,
temporary=temporary,
if_not_exists=if_not_exists,
)
def _ensure_disambiguated_names(self) -> Self:
"""Ensure that the names generated by this selectbase will be
disambiguated in some way, if possible.
+12
View File
@@ -47,6 +47,18 @@ class SuiteRequirements(Requirements):
return exclusions.open()
@property
def create_table_as(self):
"""target platform supports CREATE TABLE AS SELECT."""
return exclusions.closed()
@property
def create_temp_table_as(self):
"""target platform supports CREATE TEMPORARY TABLE AS SELECT."""
return exclusions.closed()
@property
def table_ddl_if_exists(self):
"""target platform supports IF NOT EXISTS / IF EXISTS for tables."""
+1
View File
@@ -4,6 +4,7 @@
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
from .test_create_table_as import * # noqa
from .test_cte import * # noqa
from .test_ddl import * # noqa
from .test_dialect import * # noqa
@@ -0,0 +1,329 @@
# testing/suite/test_create_table_as.py
# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: ignore-errors
from .. import fixtures
from ..assertions import eq_
from ..provision import get_temp_table_name
from ... import bindparam
from ... import Column
from ... import func
from ... import inspect
from ... import Integer
from ... import literal
from ... import MetaData
from ... import select
from ... import String
from ... import testing
from ...schema import DropTable
from ...schema import Table
from ...sql.ddl import CreateTableAs
from ...testing import config
class CreateTableAsTest(fixtures.TablesTest):
__backend__ = True
__requires__ = ("create_table_as",)
@classmethod
def temp_table_name(cls):
return get_temp_table_name(
config, config.db, f"user_tmp_{config.ident}"
)
@classmethod
def define_tables(cls, metadata):
Table(
"source_table",
metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("name", String(50)),
Column("value", Integer),
)
Table("a", metadata, Column("id", Integer))
Table("b", metadata, Column("id", Integer))
@classmethod
def insert_data(cls, connection):
table = cls.tables.source_table
connection.execute(
table.insert(),
[
{"id": 1, "name": "alice", "value": 100},
{"id": 2, "name": "bob", "value": 200},
{"id": 3, "name": "charlie", "value": 300},
],
)
a = cls.tables.a
b = cls.tables.b
connection.execute(a.insert(), [{"id": v} for v in [1, 3]])
connection.execute(b.insert(), [{"id": v} for v in [2, 4]])
@testing.fixture(scope="function", autouse=True)
def drop_dest_table(self, connection):
for schema in None, config.test_schema:
for name in ("dest_table", self.temp_table_name()):
if inspect(connection).has_table(name, schema=schema):
connection.execute(
DropTable(Table(name, MetaData(), schema=schema))
)
connection.commit()
@testing.combinations(
("plain", False, False),
("use_temp", False, True, testing.requires.create_temp_table_as),
("use_schema", True, False, testing.requires.schemas),
argnames="use_schemas,use_temp",
id_="iaa",
)
def test_create_table_as_tableclause(
self, connection, use_temp, use_schemas
):
source_table = self.tables.source_table
stmt = CreateTableAs(
select(source_table.c.id, source_table.c.name).where(
source_table.c.value > 100
),
self.temp_table_name() if use_temp else "dest_table",
temporary=bool(use_temp),
schema=config.test_schema if use_schemas else None,
)
# Execute the CTAS
connection.execute(stmt)
# Verify we can SELECT from the generated table
dest = stmt.table
result = connection.execute(
select(dest.c.id, dest.c.name).order_by(dest.c.id)
).fetchall()
eq_(result, [(2, "bob"), (3, "charlie")])
# Verify reflection works
insp = inspect(connection)
cols = insp.get_columns(
self.temp_table_name() if use_temp else "dest_table",
schema=config.test_schema if use_schemas else None,
)
eq_(len(cols), 2)
eq_(cols[0]["name"], "id")
eq_(cols[1]["name"], "name")
# Verify type affinity
eq_(cols[0]["type"]._type_affinity, Integer)
eq_(cols[1]["type"]._type_affinity, String)
@testing.variation(
"use_temp", [False, (True, testing.requires.create_temp_table_as)]
)
def test_create_table_as_with_metadata(
self, connection, metadata, use_temp
):
source_table = self.tables.source_table
stmt = CreateTableAs(
select(
source_table.c.id, source_table.c.name, source_table.c.value
),
self.temp_table_name() if use_temp else "dest_table",
metadata=metadata,
temporary=bool(use_temp),
)
# Execute the CTAS
connection.execute(stmt)
# Verify the generated table is a proper Table object
dest = stmt.table
assert isinstance(dest, Table)
assert dest.metadata is metadata
# SELECT from the generated table
result = connection.execute(
select(dest.c.id, dest.c.name, dest.c.value).where(dest.c.id == 2)
).fetchall()
eq_(result, [(2, "bob", 200)])
# Drop the table using the Table object
dest.drop(connection)
# Verify it's gone
if not use_temp:
insp = inspect(connection)
assert "dest_table" not in insp.get_table_names()
elif testing.requires.temp_table_names.enabled:
insp = inspect(connection)
assert self.temp_table_name() not in insp.get_temp_table_names()
def test_create_table_as_with_labels(self, connection):
source_table = self.tables.source_table
stmt = CreateTableAs(
select(
source_table.c.id.label("user_id"),
source_table.c.name.label("user_name"),
),
"dest_table",
)
connection.execute(stmt)
# Verify column names from labels
insp = inspect(connection)
cols = insp.get_columns("dest_table")
eq_(len(cols), 2)
eq_(cols[0]["name"], "user_id")
eq_(cols[1]["name"], "user_name")
# Verify we can query using the labels
dest = stmt.table
result = connection.execute(
select(dest.c.user_id, dest.c.user_name).where(dest.c.user_id == 1)
).fetchall()
eq_(result, [(1, "alice")])
def test_create_table_as_into_method(self, connection):
source_table = self.tables.source_table
stmt = select(source_table.c.id, source_table.c.value).into(
"dest_table"
)
connection.execute(stmt)
# Verify the table was created and can be queried
dest = stmt.table
result = connection.execute(
select(dest.c.id, dest.c.value).order_by(dest.c.id)
).fetchall()
eq_(result, [(1, 100), (2, 200), (3, 300)])
@testing.variation(
"use_temp", [False, (True, testing.requires.create_temp_table_as)]
)
@testing.variation("use_into", [True, False])
def test_metadata_use_cases(
self, use_temp, use_into, metadata, connection
):
table_name = self.temp_table_name() if use_temp else "dest_table"
source_table = self.tables.source_table
select_stmt = select(
source_table.c.id, source_table.c.name, source_table.c.value
).where(source_table.c.value > 100)
if use_into:
cas = select_stmt.into(
table_name, temporary=use_temp, metadata=metadata
)
else:
cas = CreateTableAs(
select_stmt,
table_name,
temporary=use_temp,
metadata=metadata,
)
connection.execute(cas)
dest = cas.table
eq_(dest.name, table_name)
result = connection.execute(
select(dest.c.id, dest.c.name).order_by(dest.c.id)
).fetchall()
eq_(result, [(2, "bob"), (3, "charlie")])
if use_temp:
if testing.requires.temp_table_names.enabled:
insp = inspect(connection)
assert table_name in insp.get_temp_table_names()
metadata.drop_all(connection)
insp = inspect(connection)
assert table_name not in insp.get_temp_table_names()
else:
insp = inspect(connection)
assert table_name in insp.get_table_names()
metadata.drop_all(connection)
insp = inspect(connection)
assert table_name not in insp.get_table_names()
@testing.requires.table_ddl_if_exists
def test_if_not_exists(self, connection):
source_table = self.tables.source_table
cas = CreateTableAs(
select(source_table.c.id).select_from(source_table),
"dest_table",
if_not_exists=True,
)
insp = inspect(connection)
assert "dest_table" not in insp.get_table_names()
connection.execute(cas)
insp = inspect(connection)
assert "dest_table" in insp.get_table_names()
# succeeds even though table exists
connection.execute(cas)
def test_literal_inlining_inside_select(self, connection):
src = self.tables.source_table
sel = select(
(src.c.id + 1).label("id2"),
literal("x").label("tag"),
).select_from(src)
stmt = CreateTableAs(sel, "dest_table")
connection.execute(stmt)
tbl = stmt.table
row = connection.execute(
select(func.count(), func.min(tbl.c.tag), func.max(tbl.c.tag))
).first()
eq_(row, (3, "x", "x"))
def test_create_table_as_with_bind_param_executes(self, connection):
src = self.tables.source_table
sel = (
select(src.c.id, src.c.name)
.select_from(src)
.where(src.c.name == bindparam("p", value="alice"))
)
stmt = CreateTableAs(sel, "dest_table")
connection.execute(stmt)
tbl = stmt.table
row = connection.execute(
select(func.count(), func.min(tbl.c.name), func.max(tbl.c.name))
).first()
eq_(row, (1, "alice", "alice"))
def test_compound_select_smoke(self, connection):
a, b = self.tables("a", "b")
sel = select(a.c.id).union_all(select(b.c.id)).order_by(a.c.id)
stmt = CreateTableAs(sel, "dest_table")
connection.execute(stmt)
vals = (
connection.execute(
select(stmt.table.c.id).order_by(stmt.table.c.id)
)
.scalars()
.all()
)
eq_(vals, [1, 2, 3, 4])
+2
View File
@@ -44,6 +44,7 @@ if TYPE_CHECKING:
from sqlalchemy.orm import strategies as _orm_strategies
from sqlalchemy.orm import strategy_options as _orm_strategy_options
from sqlalchemy.orm import util as _orm_util
from sqlalchemy.sql import ddl as _sql_ddl
from sqlalchemy.sql import default_comparator as _sql_default_comparator
from sqlalchemy.sql import dml as _sql_dml
from sqlalchemy.sql import elements as _sql_elements
@@ -79,6 +80,7 @@ if TYPE_CHECKING:
orm_strategy_options = _orm_strategy_options
orm_state = _orm_state
orm_util = _orm_util
sql_ddl = _sql_ddl
sql_default_comparator = _sql_default_comparator
sql_dml = _sql_dml
sql_elements = _sql_elements
+1 -1
View File
@@ -182,7 +182,7 @@ lint = [
"flake8-rst-docstrings",
"pydocstyle<4.0.0",
"pygments",
"black==25.1.0",
"black==25.9.0",
"slotscheck>=0.17.0",
"zimports", # required by generate_tuple_map_overloads
]
+14
View File
@@ -99,6 +99,20 @@ class DefaultRequirements(SuiteRequirements):
return only_on(["postgresql", "mysql", "mariadb", "sqlite"])
@property
def create_table_as(self):
"""target platform supports CREATE TABLE AS SELECT."""
return only_on(
["postgresql", "mysql", "mariadb", "sqlite", "mssql", "oracle"]
)
@property
def create_temp_table_as(self):
"""target platform supports CREATE TEMPORARY TABLE AS SELECT."""
return only_on(["postgresql", "mysql", "mariadb", "sqlite", "mssql"])
@property
def index_ddl_if_exists(self):
"""target platform supports IF NOT EXISTS / IF EXISTS for indexes."""
+357
View File
@@ -0,0 +1,357 @@
import re
from sqlalchemy import bindparam
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy.exc import ArgumentError
from sqlalchemy.schema import CreateTable
from sqlalchemy.sql import column
from sqlalchemy.sql import select
from sqlalchemy.sql import table
from sqlalchemy.sql.ddl import CreateTableAs
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing.assertions import AssertsCompiledSQL
from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.assertions import expect_warnings
class CreateTableAsDefaultDialectTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
@testing.fixture
def src_table(self):
return Table(
"src",
MetaData(),
Column("id", Integer),
Column("name", String(50)),
)
@testing.fixture
def src_two_tables(self):
a = table("a", column("id"), column("name"))
b = table("b", column("id"), column("status"))
return a, b
def test_basic_element(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id, src.c.name).select_from(src),
"dst",
)
self.assert_compile(
stmt,
"CREATE TABLE dst AS SELECT src.id, src.name FROM src",
)
def test_schema_element_qualified(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id).select_from(src),
"dst",
schema="analytics",
)
self.assert_compile(
stmt,
"CREATE TABLE analytics.dst AS SELECT src.id FROM src",
)
def test_blank_schema_treated_as_none(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id).select_from(src), "dst", schema=""
)
self.assert_compile(stmt, "CREATE TABLE dst AS SELECT src.id FROM src")
def test_binds_rendered_inline(self, src_table):
src = src_table
stmt = CreateTableAs(
select(literal("x").label("tag")).select_from(src),
"dst",
)
self.assert_compile(
stmt,
"CREATE TABLE dst AS SELECT 'x' AS tag FROM src",
)
def test_temporary_no_schema(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id, src.c.name).select_from(src),
"dst",
temporary=True,
)
self.assert_compile(
stmt,
"CREATE TEMPORARY TABLE dst AS "
"SELECT src.id, src.name FROM src",
)
def test_temporary_exists_flags(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id).select_from(src),
"dst",
schema="sch",
temporary=True,
if_not_exists=True,
)
self.assert_compile(
stmt,
"CREATE TEMPORARY TABLE "
"IF NOT EXISTS sch.dst AS SELECT src.id FROM src",
)
def test_if_not_exists(self, src_table):
src = src_table
stmt = CreateTableAs(
select(src.c.id, src.c.name).select_from(src),
"dst",
if_not_exists=True,
)
self.assert_compile(
stmt,
"CREATE TABLE IF NOT EXISTS dst AS "
"SELECT src.id, src.name FROM src",
)
def test_join_with_binds_rendered_inline(self, src_two_tables):
a, b = src_two_tables
s = (
select(a.c.id, a.c.name)
.select_from(a.join(b, a.c.id == b.c.id))
.where(b.c.status == "active")
).into("dst")
# Ensure WHERE survives into CTAS and binds are rendered inline
self.assert_compile(
s,
"CREATE TABLE dst AS "
"SELECT a.id, a.name FROM a JOIN b ON a.id = b.id "
"WHERE b.status = 'active'",
)
def test_into_equivalent_to_element(self, src_table):
src = src_table
s = select(src.c.id).select_from(src).where(src.c.id == 2)
via_into = s.into("dst")
via_element = CreateTableAs(s, "dst")
self.assert_compile(
via_into,
"CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2",
)
self.assert_compile(
via_element,
"CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2",
)
def test_into_does_not_mutate_original_select(self, src_table):
src = src_table
s = select(src.c.id).select_from(src).where(src.c.id == 5)
# compile original SELECT
self.assert_compile(
s,
"SELECT src.id FROM src WHERE src.id = :id_1",
)
# build CTAS
_ = s.into("dst")
# original is still a SELECT
self.assert_compile(
s,
"SELECT src.id FROM src WHERE src.id = :id_1",
)
def test_into_with_schema_argument(self, src_table):
src = src_table
s = select(src.c.id).select_from(src).into("t", schema="analytics")
self.assert_compile(
s,
"CREATE TABLE analytics.t AS SELECT src.id FROM src",
)
def test_target_string_must_be_unqualified(self, src_table):
src = src_table
with expect_raises_message(
ArgumentError,
re.escape("Target string must be unqualified (use schema=)."),
):
CreateTableAs(select(src.c.id).select_from(src), "sch.dst")
def test_empty_name(self):
with expect_raises_message(
ArgumentError, "Table name must be non-empty"
):
CreateTableAs(select(literal(1)), "")
@testing.variation("provide_metadata", [True, False])
def test_generated_metadata_table_property(
self, src_table, provide_metadata
):
src = src_table
if provide_metadata:
metadata = MetaData()
else:
metadata = None
stmt = CreateTableAs(
select(src.c.name.label("thename"), src.c.id).select_from(src),
"dst",
schema="sch",
metadata=metadata,
)
if metadata is not None:
is_(stmt.metadata, metadata)
assert isinstance(stmt.table, Table)
is_(stmt.table.metadata, stmt.metadata)
self.assert_compile(
CreateTable(stmt.table),
"CREATE TABLE sch.dst (thename VARCHAR(50), id INTEGER)",
)
def test_labels_in_select_list_preserved(self, src_table):
src = src_table
stmt = CreateTableAs(
select(
src.c.id.label("user_id"), src.c.name.label("user_name")
).select_from(src),
"dst",
)
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT src.id AS user_id, src.name AS user_name FROM src",
)
def test_distinct_and_group_by_survive(self, src_table):
src = src_table
sel = (
select(src.c.name).select_from(src).distinct().group_by(src.c.name)
)
stmt = CreateTableAs(sel, "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT DISTINCT src.name FROM src GROUP BY src.name",
)
def test_bindparam_no_value_raises(self, src_table):
src = src_table
sel = select(src.c.name).where(src.c.name == bindparam("x"))
stmt = CreateTableAs(sel, "dst")
with expect_warnings(
"Bound parameter 'x' rendering literal NULL in a SQL expression;"
):
self.assert_compile(
stmt,
"CREATE TABLE dst AS SELECT src.name FROM src "
"WHERE src.name = NULL",
)
def test_union_all_with_binds_rendered_inline(self, src_two_tables):
a, b = src_two_tables
# Named binds so params are deterministic
s1 = (
select(a.c.id)
.select_from(a)
.where(a.c.id == bindparam("p_a", value=1))
)
s2 = (
select(b.c.id)
.select_from(b)
.where(b.c.id == bindparam("p_b", value=2))
)
u_all = s1.union_all(s2)
stmt = CreateTableAs(u_all, "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT a.id FROM a WHERE a.id = 1 "
"UNION ALL SELECT b.id FROM b WHERE b.id = 2",
)
def test_union_labels_follow_first_select(self, src_two_tables):
# Many engines take column names
# of a UNION from the first SELECTs labels.
a = table("a", column("val"))
b = table("b", column("val"))
s1 = select(a.c.val.label("first_name")).select_from(a)
s2 = select(b.c.val).select_from(b) # unlabeled second branch
u = s1.union(s2)
stmt = CreateTableAs(u, "dst")
# We only assert whats stable across dialects:
# - first SELECT has the label
# - a UNION occurs
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT a.val AS first_name FROM a "
"UNION "
"SELECT b.val FROM b",
)
def test_union_all_with_inlined_literals_smoke(self, src_two_tables):
# Proves literal_binds=True behavior applies across branches.
a, b = src_two_tables
u = (
select(literal(1).label("x"))
.select_from(a)
.union_all(select(literal("b").label("x")).select_from(b))
)
stmt = CreateTableAs(u, "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT 1 AS x FROM a UNION ALL SELECT 'b' AS x FROM b",
)
def test_select_shape_where_order_limit(self, src_table):
src = src_table
sel = (
select(src.c.id, src.c.name)
.select_from(src)
.where(src.c.id > literal(10))
.order_by(src.c.name)
.limit(5)
.offset(0)
)
stmt = CreateTableAs(sel, "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"SELECT src.id, src.name FROM src "
"WHERE src.id > 10 ORDER BY src.name LIMIT 5 OFFSET 0",
)
def test_cte_smoke(self, src_two_tables):
# Proves CTAS works with a WITH-CTE wrapper and labeled column.
a, _ = src_two_tables
cte = select(a.c.id.label("aid")).select_from(a).cte("u")
stmt = CreateTableAs(select(cte.c.aid), "dst")
self.assert_compile(
stmt,
"CREATE TABLE dst AS "
"WITH u AS (SELECT a.id AS aid FROM a) "
"SELECT u.aid FROM u",
)
@@ -0,0 +1,114 @@
"""Typing tests for CREATE TABLE AS."""
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.sql.ddl import CreateTableAs
# Setup
metadata = MetaData()
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50)),
Column("email", String(100)),
Column("status", String(20)),
)
# Test 1: Basic CreateTableAs with string table name
stmt1 = select(users.c.id, users.c.name).where(users.c.id > 10)
ctas1 = CreateTableAs(stmt1, "active_users")
# Test 2: CreateTableAs with MetaData (creates Table object)
ctas2 = CreateTableAs(stmt1, "active_users_table", metadata=metadata)
# Test 3: Using .into() method on Select
ctas3 = stmt1.into("users_copy")
# Test 4: With schema parameter
ctas4 = CreateTableAs(stmt1, "users_backup", schema="backup")
# Test 5: With temporary flag
ctas5 = CreateTableAs(stmt1, "temp_users", temporary=True)
# Test 6: With if_not_exists flag
ctas6 = CreateTableAs(stmt1, "users_safe", if_not_exists=True)
# Test 7: Combining flags
ctas7 = CreateTableAs(
stmt1, "temp_backup", temporary=True, if_not_exists=True, schema="temp"
)
# Test 8: Access table property
dest_table1 = ctas1.table
dest_table2 = ctas2.table
# Test 9: Access columns from generated table
id_column = dest_table1.c.id
name_column = dest_table1.c.name
# Test 10: Use generated table in another select
new_select = select(dest_table1.c.id, dest_table1.c.name).where(
dest_table1.c.id < 100
)
# Test 11: With column labels
labeled_stmt = select(
users.c.id.label("user_id"),
users.c.name.label("user_name"),
users.c.email.label("user_email"),
)
ctas_labeled = CreateTableAs(labeled_stmt, "labeled_users")
labeled_table = ctas_labeled.table
user_id_col = labeled_table.c.user_id
user_name_col = labeled_table.c.user_name
# Test 12: With WHERE clause
filtered_stmt = select(users.c.id, users.c.status).where(
users.c.status == "active"
)
ctas_filtered = CreateTableAs(filtered_stmt, "active_status")
# Test 13: With JOIN
orders = Table(
"orders",
metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer),
Column("amount", Integer),
)
join_stmt = select(users.c.id, users.c.name, orders.c.amount).select_from(
users.join(orders, users.c.id == orders.c.user_id)
)
ctas_join = CreateTableAs(join_stmt, "user_orders")
# Test 14: With UNION
stmt_a = select(users.c.id, users.c.name).where(users.c.status == "active")
stmt_b = select(users.c.id, users.c.name).where(users.c.status == "pending")
union_stmt = stmt_a.union(stmt_b)
ctas_union = CreateTableAs(union_stmt, "combined_users")
# Test 15: .into() with metadata
ctas_into_meta = stmt1.into("users_copy_meta", metadata=metadata)
# Test 16: .into() with all options
ctas_into_full = stmt1.into(
"full_copy", metadata=metadata, schema="backup", temporary=True
)
# Test 17: Verify generated table can be used in expressions
generated = ctas1.table
count_stmt = select(generated.c.id).where(generated.c.id > 5)
# Test 18: Chained operations
final_stmt = (
select(users.c.id, users.c.name)
.where(users.c.status == "active")
.into("final_result")
)
final_table = final_stmt.table
final_id = final_table.c.id