mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-07-03 22:03:21 -04:00
3a42375d85
Fixed 2.0 regression where the :class:`.DDL` construct would no longer
``__repr__()`` due to the removed ``on`` attribute not being accommodated.
Pull request courtesy Iuri de Silvio.
Fixes: #10443
Closes: #10442
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/10442
Pull-request-sha: 76f0484091
Change-Id: Id49afd681a04e0f05014105b8f15fdb66e6594a0
1042 lines
32 KiB
Python
1042 lines
32 KiB
Python
from unittest import mock
|
|
from unittest.mock import Mock
|
|
|
|
import sqlalchemy as tsa
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy import create_mock_engine
|
|
from sqlalchemy import event
|
|
from sqlalchemy import Index
|
|
from sqlalchemy import Integer
|
|
from sqlalchemy import MetaData
|
|
from sqlalchemy import String
|
|
from sqlalchemy import testing
|
|
from sqlalchemy import text
|
|
from sqlalchemy.schema import AddConstraint
|
|
from sqlalchemy.schema import CheckConstraint
|
|
from sqlalchemy.schema import DDL
|
|
from sqlalchemy.schema import DropConstraint
|
|
from sqlalchemy.schema import ForeignKeyConstraint
|
|
from sqlalchemy.schema import Sequence
|
|
from sqlalchemy.testing import AssertsCompiledSQL
|
|
from sqlalchemy.testing import config
|
|
from sqlalchemy.testing import engines
|
|
from sqlalchemy.testing import eq_
|
|
from sqlalchemy.testing import fixtures
|
|
from sqlalchemy.testing.provision import normalize_sequence
|
|
from sqlalchemy.testing.schema import Column
|
|
from sqlalchemy.testing.schema import Table
|
|
|
|
|
|
class DDLEventTest(fixtures.TestBase):
|
|
def setup_test(self):
|
|
self.bind = engines.mock_engine()
|
|
self.metadata = MetaData()
|
|
self.table = Table("t", self.metadata, Column("id", Integer))
|
|
|
|
def test_table_create_before(self):
|
|
table, bind = self.table, self.bind
|
|
canary = mock.Mock()
|
|
event.listen(table, "before_create", canary.before_create)
|
|
|
|
table.create(bind)
|
|
table.drop(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_create(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_table_create_after(self):
|
|
table, bind = self.table, self.bind
|
|
canary = mock.Mock()
|
|
event.listen(table, "after_create", canary.after_create)
|
|
|
|
table.create(bind)
|
|
table.drop(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.after_create(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_table_create_both(self):
|
|
table, bind = self.table, self.bind
|
|
canary = mock.Mock()
|
|
event.listen(table, "before_create", canary.before_create)
|
|
event.listen(table, "after_create", canary.after_create)
|
|
|
|
table.create(bind)
|
|
table.drop(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_create(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
),
|
|
mock.call.after_create(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
),
|
|
],
|
|
)
|
|
|
|
def test_table_drop_before(self):
|
|
table, bind = self.table, self.bind
|
|
canary = mock.Mock()
|
|
event.listen(table, "before_drop", canary.before_drop)
|
|
|
|
table.create(bind)
|
|
table.drop(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_drop(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_table_drop_after(self):
|
|
table, bind = self.table, self.bind
|
|
canary = mock.Mock()
|
|
event.listen(table, "after_drop", canary.after_drop)
|
|
|
|
table.create(bind)
|
|
canary.state = "skipped"
|
|
table.drop(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.after_drop(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_table_drop_both(self):
|
|
table, bind = self.table, self.bind
|
|
canary = mock.Mock()
|
|
|
|
event.listen(table, "before_drop", canary.before_drop)
|
|
event.listen(table, "after_drop", canary.after_drop)
|
|
|
|
table.create(bind)
|
|
table.drop(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_drop(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
),
|
|
mock.call.after_drop(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
),
|
|
],
|
|
)
|
|
|
|
def test_table_all(self):
|
|
table, bind = self.table, self.bind
|
|
canary = mock.Mock()
|
|
|
|
event.listen(table, "before_create", canary.before_create)
|
|
event.listen(table, "after_create", canary.after_create)
|
|
event.listen(table, "before_drop", canary.before_drop)
|
|
event.listen(table, "after_drop", canary.after_drop)
|
|
|
|
table.create(bind)
|
|
table.drop(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_create(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
),
|
|
mock.call.after_create(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
),
|
|
mock.call.before_drop(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
),
|
|
mock.call.after_drop(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
),
|
|
],
|
|
)
|
|
|
|
def test_metadata_create_before(self):
|
|
metadata, bind = self.metadata, self.bind
|
|
canary = mock.Mock()
|
|
event.listen(metadata, "before_create", canary.before_create)
|
|
|
|
metadata.create_all(bind)
|
|
metadata.drop_all(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_create(
|
|
# checkfirst is False because of the MockConnection
|
|
# used in the current testing strategy.
|
|
metadata,
|
|
self.bind,
|
|
checkfirst=False,
|
|
tables=list(metadata.tables.values()),
|
|
_ddl_runner=mock.ANY,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_metadata_create_after(self):
|
|
metadata, bind = self.metadata, self.bind
|
|
canary = mock.Mock()
|
|
event.listen(metadata, "after_create", canary.after_create)
|
|
|
|
metadata.create_all(bind)
|
|
metadata.drop_all(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.after_create(
|
|
metadata,
|
|
self.bind,
|
|
checkfirst=False,
|
|
tables=list(metadata.tables.values()),
|
|
_ddl_runner=mock.ANY,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_metadata_create_both(self):
|
|
metadata, bind = self.metadata, self.bind
|
|
canary = mock.Mock()
|
|
|
|
event.listen(metadata, "before_create", canary.before_create)
|
|
event.listen(metadata, "after_create", canary.after_create)
|
|
|
|
metadata.create_all(bind)
|
|
metadata.drop_all(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_create(
|
|
metadata,
|
|
self.bind,
|
|
checkfirst=False,
|
|
tables=list(metadata.tables.values()),
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
mock.call.after_create(
|
|
metadata,
|
|
self.bind,
|
|
checkfirst=False,
|
|
tables=list(metadata.tables.values()),
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
],
|
|
)
|
|
|
|
def test_metadata_drop_before(self):
|
|
metadata, bind = self.metadata, self.bind
|
|
canary = mock.Mock()
|
|
event.listen(metadata, "before_drop", canary.before_drop)
|
|
|
|
metadata.create_all(bind)
|
|
metadata.drop_all(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_drop(
|
|
metadata,
|
|
self.bind,
|
|
checkfirst=False,
|
|
tables=list(metadata.tables.values()),
|
|
_ddl_runner=mock.ANY,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_metadata_drop_after(self):
|
|
metadata, bind = self.metadata, self.bind
|
|
canary = mock.Mock()
|
|
event.listen(metadata, "after_drop", canary.after_drop)
|
|
|
|
metadata.create_all(bind)
|
|
metadata.drop_all(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.after_drop(
|
|
metadata,
|
|
self.bind,
|
|
checkfirst=False,
|
|
tables=list(metadata.tables.values()),
|
|
_ddl_runner=mock.ANY,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_metadata_drop_both(self):
|
|
metadata, bind = self.metadata, self.bind
|
|
canary = mock.Mock()
|
|
|
|
event.listen(metadata, "before_drop", canary.before_drop)
|
|
event.listen(metadata, "after_drop", canary.after_drop)
|
|
|
|
metadata.create_all(bind)
|
|
metadata.drop_all(bind)
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_drop(
|
|
metadata,
|
|
self.bind,
|
|
checkfirst=False,
|
|
tables=list(metadata.tables.values()),
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
mock.call.after_drop(
|
|
metadata,
|
|
self.bind,
|
|
checkfirst=False,
|
|
tables=list(metadata.tables.values()),
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
],
|
|
)
|
|
|
|
def test_metadata_table_isolation(self):
|
|
metadata, table = self.metadata, self.table
|
|
table_canary = mock.Mock()
|
|
metadata_canary = mock.Mock()
|
|
|
|
event.listen(table, "before_create", table_canary.before_create)
|
|
|
|
event.listen(metadata, "before_create", metadata_canary.before_create)
|
|
self.table.create(self.bind)
|
|
eq_(
|
|
table_canary.mock_calls,
|
|
[
|
|
mock.call.before_create(
|
|
table,
|
|
self.bind,
|
|
checkfirst=False,
|
|
_ddl_runner=mock.ANY,
|
|
_is_metadata_operation=mock.ANY,
|
|
)
|
|
],
|
|
)
|
|
eq_(metadata_canary.mock_calls, [])
|
|
|
|
|
|
class DDLEventHarness:
|
|
creates_implicitly_with_table = True
|
|
drops_implicitly_with_table = True
|
|
|
|
@testing.fixture
|
|
def produce_subject(self):
|
|
raise NotImplementedError()
|
|
|
|
@testing.fixture
|
|
def produce_event_target(self, produce_subject, connection):
|
|
"""subclasses may want to override this for cases where the target
|
|
sent to the event is not the same object as that which was
|
|
listened on.
|
|
|
|
the example here is for :class:`.SchemaType` objects like
|
|
:class:`.Enum` that produce a dialect-specific implementation
|
|
which is where the actual CREATE/DROP happens.
|
|
|
|
"""
|
|
return produce_subject
|
|
|
|
@testing.fixture
|
|
def produce_table_integrated_subject(self, metadata, produce_subject):
|
|
raise NotImplementedError()
|
|
|
|
def test_table_integrated(
|
|
self,
|
|
metadata,
|
|
connection,
|
|
produce_subject,
|
|
produce_table_integrated_subject,
|
|
produce_event_target,
|
|
):
|
|
subject = produce_subject
|
|
assert_subject = produce_event_target
|
|
|
|
canary = mock.Mock()
|
|
event.listen(subject, "before_create", canary.before_create)
|
|
event.listen(subject, "after_create", canary.after_create)
|
|
event.listen(subject, "before_drop", canary.before_drop)
|
|
event.listen(subject, "after_drop", canary.after_drop)
|
|
|
|
metadata.create_all(connection, checkfirst=False)
|
|
|
|
if self.creates_implicitly_with_table:
|
|
create_calls = []
|
|
else:
|
|
create_calls = [
|
|
mock.call.before_create(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
mock.call.after_create(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
]
|
|
eq_(canary.mock_calls, create_calls)
|
|
metadata.drop_all(connection, checkfirst=False)
|
|
|
|
if self.drops_implicitly_with_table:
|
|
eq_(canary.mock_calls, create_calls + [])
|
|
else:
|
|
eq_(
|
|
canary.mock_calls,
|
|
create_calls
|
|
+ [
|
|
mock.call.before_drop(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
mock.call.after_drop(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
],
|
|
)
|
|
|
|
|
|
class DDLEventWCreateHarness(DDLEventHarness):
|
|
requires_table_to_exist = True
|
|
|
|
def test_straight_create_drop(
|
|
self,
|
|
metadata,
|
|
connection,
|
|
produce_subject,
|
|
produce_table_integrated_subject,
|
|
produce_event_target,
|
|
):
|
|
subject = produce_subject
|
|
assert_subject = produce_event_target
|
|
|
|
if self.requires_table_to_exist:
|
|
metadata.create_all(connection, checkfirst=False)
|
|
subject.drop(connection)
|
|
|
|
canary = mock.Mock()
|
|
event.listen(subject, "before_create", canary.before_create)
|
|
event.listen(subject, "after_create", canary.after_create)
|
|
event.listen(subject, "before_drop", canary.before_drop)
|
|
event.listen(subject, "after_drop", canary.after_drop)
|
|
|
|
subject.create(connection)
|
|
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_create(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
mock.call.after_create(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
],
|
|
)
|
|
|
|
subject.drop(connection)
|
|
|
|
eq_(
|
|
canary.mock_calls,
|
|
[
|
|
mock.call.before_create(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
mock.call.after_create(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
mock.call.before_drop(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
mock.call.after_drop(
|
|
assert_subject,
|
|
connection,
|
|
_ddl_runner=mock.ANY,
|
|
),
|
|
],
|
|
)
|
|
|
|
|
|
class SequenceDDLEventTest(DDLEventWCreateHarness, fixtures.TestBase):
|
|
__requires__ = ("sequences",)
|
|
|
|
creates_implicitly_with_table = False
|
|
drops_implicitly_with_table = False
|
|
supports_standalone_create = True
|
|
|
|
@testing.fixture
|
|
def produce_subject(self):
|
|
return normalize_sequence(config, Sequence("my_seq"))
|
|
|
|
@testing.fixture
|
|
def produce_table_integrated_subject(self, metadata, produce_subject):
|
|
return Table(
|
|
"t",
|
|
metadata,
|
|
Column("id", Integer, produce_subject, primary_key=True),
|
|
)
|
|
|
|
|
|
class IndexDDLEventTest(DDLEventWCreateHarness, fixtures.TestBase):
|
|
creates_implicitly_with_table = False
|
|
drops_implicitly_with_table = True
|
|
supports_standalone_create = False
|
|
|
|
@testing.fixture
|
|
def produce_subject(self):
|
|
return Index("my_idx", "key")
|
|
|
|
@testing.fixture
|
|
def produce_table_integrated_subject(self, metadata, produce_subject):
|
|
return Table(
|
|
"t",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("key", String(50)),
|
|
produce_subject,
|
|
)
|
|
|
|
|
|
class ForeignKeyConstraintDDLEventTest(DDLEventHarness, fixtures.TestBase):
|
|
creates_implicitly_with_table = True
|
|
drops_implicitly_with_table = True
|
|
supports_standalone_create = False
|
|
|
|
@testing.fixture
|
|
def produce_subject(self):
|
|
return ForeignKeyConstraint(["related_id"], ["related.id"], name="fkc")
|
|
|
|
@testing.fixture
|
|
def produce_table_integrated_subject(self, metadata, produce_subject):
|
|
Table(
|
|
"t",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("related_id", Integer),
|
|
produce_subject,
|
|
)
|
|
Table("related", metadata, Column("id", Integer, primary_key=True))
|
|
|
|
|
|
class DDLExecutionTest(AssertsCompiledSQL, fixtures.TestBase):
|
|
def setup_test(self):
|
|
self.engine = engines.mock_engine()
|
|
self.metadata = MetaData()
|
|
self.users = Table(
|
|
"users",
|
|
self.metadata,
|
|
Column("user_id", Integer, primary_key=True),
|
|
Column("user_name", String(40)),
|
|
)
|
|
|
|
def test_table_standalone(self):
|
|
users, engine = self.users, self.engine
|
|
event.listen(users, "before_create", DDL("mxyzptlk"))
|
|
event.listen(users, "after_create", DDL("klptzyxm"))
|
|
event.listen(users, "before_drop", DDL("xyzzy"))
|
|
event.listen(users, "after_drop", DDL("fnord"))
|
|
|
|
users.create(self.engine)
|
|
strings = [str(x) for x in engine.mock]
|
|
assert "mxyzptlk" in strings
|
|
assert "klptzyxm" in strings
|
|
assert "xyzzy" not in strings
|
|
assert "fnord" not in strings
|
|
del engine.mock[:]
|
|
users.drop(self.engine)
|
|
strings = [str(x) for x in engine.mock]
|
|
assert "mxyzptlk" not in strings
|
|
assert "klptzyxm" not in strings
|
|
assert "xyzzy" in strings
|
|
assert "fnord" in strings
|
|
|
|
def test_table_by_metadata(self):
|
|
metadata, users, engine = self.metadata, self.users, self.engine
|
|
|
|
event.listen(users, "before_create", DDL("mxyzptlk"))
|
|
event.listen(users, "after_create", DDL("klptzyxm"))
|
|
event.listen(users, "before_drop", DDL("xyzzy"))
|
|
event.listen(users, "after_drop", DDL("fnord"))
|
|
|
|
metadata.create_all(self.engine)
|
|
strings = [str(x) for x in engine.mock]
|
|
assert "mxyzptlk" in strings
|
|
assert "klptzyxm" in strings
|
|
assert "xyzzy" not in strings
|
|
assert "fnord" not in strings
|
|
del engine.mock[:]
|
|
metadata.drop_all(self.engine)
|
|
strings = [str(x) for x in engine.mock]
|
|
assert "mxyzptlk" not in strings
|
|
assert "klptzyxm" not in strings
|
|
assert "xyzzy" in strings
|
|
assert "fnord" in strings
|
|
|
|
def test_metadata(self):
|
|
metadata, engine = self.metadata, self.engine
|
|
|
|
event.listen(metadata, "before_create", DDL("mxyzptlk"))
|
|
event.listen(metadata, "after_create", DDL("klptzyxm"))
|
|
event.listen(metadata, "before_drop", DDL("xyzzy"))
|
|
event.listen(metadata, "after_drop", DDL("fnord"))
|
|
|
|
metadata.create_all(self.engine)
|
|
strings = [str(x) for x in engine.mock]
|
|
assert "mxyzptlk" in strings
|
|
assert "klptzyxm" in strings
|
|
assert "xyzzy" not in strings
|
|
assert "fnord" not in strings
|
|
del engine.mock[:]
|
|
metadata.drop_all(self.engine)
|
|
strings = [str(x) for x in engine.mock]
|
|
assert "mxyzptlk" not in strings
|
|
assert "klptzyxm" not in strings
|
|
assert "xyzzy" in strings
|
|
assert "fnord" in strings
|
|
|
|
def test_conditional_constraint(self):
|
|
metadata, users = self.metadata, self.users
|
|
nonpg_mock = engines.mock_engine(dialect_name="sqlite")
|
|
pg_mock = engines.mock_engine(dialect_name="postgresql")
|
|
constraint = CheckConstraint(
|
|
"a < b", name="my_test_constraint", table=users
|
|
)
|
|
|
|
# by placing the constraint in an Add/Drop construct, the
|
|
# 'inline_ddl' flag is set to False
|
|
|
|
event.listen(
|
|
users,
|
|
"after_create",
|
|
AddConstraint(constraint).execute_if(dialect="postgresql"),
|
|
)
|
|
|
|
event.listen(
|
|
users,
|
|
"before_drop",
|
|
DropConstraint(constraint).execute_if(dialect="postgresql"),
|
|
)
|
|
|
|
metadata.create_all(bind=nonpg_mock)
|
|
strings = " ".join(str(x) for x in nonpg_mock.mock)
|
|
assert "my_test_constraint" not in strings
|
|
metadata.drop_all(bind=nonpg_mock)
|
|
strings = " ".join(str(x) for x in nonpg_mock.mock)
|
|
assert "my_test_constraint" not in strings
|
|
metadata.create_all(bind=pg_mock)
|
|
strings = " ".join(str(x) for x in pg_mock.mock)
|
|
assert "my_test_constraint" in strings
|
|
metadata.drop_all(bind=pg_mock)
|
|
strings = " ".join(str(x) for x in pg_mock.mock)
|
|
assert "my_test_constraint" in strings
|
|
|
|
@testing.combinations(("dialect",), ("callable",), ("callable_w_state",))
|
|
def test_inline_ddl_if_dialect_name(self, ddl_if_type):
|
|
nonpg_mock = engines.mock_engine(dialect_name="sqlite")
|
|
pg_mock = engines.mock_engine(dialect_name="postgresql")
|
|
|
|
metadata = MetaData()
|
|
|
|
capture_mock = Mock()
|
|
state = object()
|
|
|
|
if ddl_if_type == "dialect":
|
|
ddl_kwargs = dict(dialect="postgresql")
|
|
elif ddl_if_type == "callable":
|
|
|
|
def is_pg(ddl, target, bind, **kw):
|
|
capture_mock.is_pg(ddl, target, bind, **kw)
|
|
return kw["dialect"].name == "postgresql"
|
|
|
|
ddl_kwargs = dict(callable_=is_pg)
|
|
elif ddl_if_type == "callable_w_state":
|
|
|
|
def is_pg(ddl, target, bind, **kw):
|
|
capture_mock.is_pg(ddl, target, bind, **kw)
|
|
return kw["dialect"].name == "postgresql"
|
|
|
|
ddl_kwargs = dict(callable_=is_pg, state=state)
|
|
else:
|
|
assert False
|
|
|
|
data_col = Column("data", String)
|
|
t = Table(
|
|
"a",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("num", Integer),
|
|
data_col,
|
|
Index("my_pg_index", data_col).ddl_if(**ddl_kwargs),
|
|
CheckConstraint("num > 5").ddl_if(**ddl_kwargs),
|
|
)
|
|
|
|
metadata.create_all(nonpg_mock)
|
|
eq_(len(nonpg_mock.mock), 1)
|
|
self.assert_compile(
|
|
nonpg_mock.mock[0],
|
|
"CREATE TABLE a (id INTEGER NOT NULL, num INTEGER, "
|
|
"data VARCHAR, PRIMARY KEY (id))",
|
|
dialect=nonpg_mock.dialect,
|
|
)
|
|
|
|
metadata.create_all(pg_mock)
|
|
|
|
eq_(len(pg_mock.mock), 2)
|
|
|
|
self.assert_compile(
|
|
pg_mock.mock[0],
|
|
"CREATE TABLE a (id SERIAL NOT NULL, num INTEGER, "
|
|
"data VARCHAR, PRIMARY KEY (id), CHECK (num > 5))",
|
|
dialect=pg_mock.dialect,
|
|
)
|
|
self.assert_compile(
|
|
pg_mock.mock[1],
|
|
"CREATE INDEX my_pg_index ON a (data)",
|
|
dialect="postgresql",
|
|
)
|
|
|
|
the_index = list(t.indexes)[0]
|
|
the_constraint = list(
|
|
c for c in t.constraints if isinstance(c, CheckConstraint)
|
|
)[0]
|
|
|
|
if ddl_if_type in ("callable", "callable_w_state"):
|
|
if ddl_if_type == "callable":
|
|
check_state = None
|
|
else:
|
|
check_state = state
|
|
|
|
eq_(
|
|
capture_mock.mock_calls,
|
|
[
|
|
mock.call.is_pg(
|
|
mock.ANY,
|
|
the_index,
|
|
mock.ANY,
|
|
state=check_state,
|
|
dialect=nonpg_mock.dialect,
|
|
compiler=None,
|
|
),
|
|
mock.call.is_pg(
|
|
mock.ANY,
|
|
the_constraint,
|
|
None,
|
|
state=check_state,
|
|
dialect=nonpg_mock.dialect,
|
|
compiler=mock.ANY,
|
|
),
|
|
mock.call.is_pg(
|
|
mock.ANY,
|
|
the_index,
|
|
mock.ANY,
|
|
state=check_state,
|
|
dialect=pg_mock.dialect,
|
|
compiler=None,
|
|
),
|
|
mock.call.is_pg(
|
|
mock.ANY,
|
|
the_constraint,
|
|
None,
|
|
state=check_state,
|
|
dialect=pg_mock.dialect,
|
|
compiler=mock.ANY,
|
|
),
|
|
],
|
|
)
|
|
|
|
@testing.requires.sqlite
|
|
def test_ddl_execute(self):
|
|
engine = create_engine("sqlite:///")
|
|
cx = engine.connect()
|
|
cx.begin()
|
|
ddl = DDL("SELECT 1")
|
|
|
|
r = cx.execute(ddl)
|
|
eq_(list(r), [(1,)])
|
|
|
|
def test_platform_escape(self):
|
|
"""test the escaping of % characters in the DDL construct."""
|
|
|
|
default_from = testing.db.dialect.statement_compiler(
|
|
testing.db.dialect, None
|
|
).default_from()
|
|
|
|
# We're abusing the DDL()
|
|
# construct here by pushing a SELECT through it
|
|
# so that we can verify the round trip.
|
|
# the DDL() will trigger autocommit, which prohibits
|
|
# some DBAPIs from returning results (pyodbc), so we
|
|
# run in an explicit transaction.
|
|
with testing.db.begin() as conn:
|
|
eq_(
|
|
conn.execute(
|
|
text("select 'foo%something'" + default_from)
|
|
).scalar(),
|
|
"foo%something",
|
|
)
|
|
|
|
eq_(
|
|
conn.execute(
|
|
DDL("select 'foo%%something'" + default_from)
|
|
).scalar(),
|
|
"foo%something",
|
|
)
|
|
|
|
|
|
class DDLTransactionTest(fixtures.TestBase):
|
|
"""test DDL transactional behavior as of SQLAlchemy 1.4."""
|
|
|
|
@testing.fixture
|
|
def metadata_fixture(self):
|
|
m = MetaData()
|
|
Table("t1", m, Column("q", Integer))
|
|
Table("t2", m, Column("q", Integer))
|
|
|
|
try:
|
|
yield m
|
|
finally:
|
|
m.drop_all(testing.db)
|
|
|
|
@testing.fixture
|
|
def listening_engine_fixture(self):
|
|
eng = engines.testing_engine()
|
|
|
|
m1 = mock.Mock()
|
|
|
|
event.listen(eng, "begin", m1.begin)
|
|
event.listen(eng, "commit", m1.commit)
|
|
event.listen(eng, "rollback", m1.rollback)
|
|
|
|
@event.listens_for(eng, "before_cursor_execute")
|
|
def before_cursor_execute(
|
|
conn, cursor, statement, parameters, context, executemany
|
|
):
|
|
if "CREATE TABLE" in statement:
|
|
m1.cursor_execute("CREATE TABLE ...")
|
|
|
|
eng.connect().close()
|
|
|
|
return eng, m1
|
|
|
|
def test_ddl_engine(self, metadata_fixture, listening_engine_fixture):
|
|
eng, m1 = listening_engine_fixture
|
|
|
|
metadata_fixture.create_all(eng)
|
|
|
|
eq_(
|
|
m1.mock_calls,
|
|
[
|
|
mock.call.begin(mock.ANY),
|
|
mock.call.cursor_execute("CREATE TABLE ..."),
|
|
mock.call.cursor_execute("CREATE TABLE ..."),
|
|
mock.call.commit(mock.ANY),
|
|
],
|
|
)
|
|
|
|
def test_ddl_connection_autobegin_transaction(
|
|
self, metadata_fixture, listening_engine_fixture
|
|
):
|
|
eng, m1 = listening_engine_fixture
|
|
|
|
with eng.connect() as conn:
|
|
metadata_fixture.create_all(conn)
|
|
|
|
conn.commit()
|
|
|
|
eq_(
|
|
m1.mock_calls,
|
|
[
|
|
mock.call.begin(mock.ANY),
|
|
mock.call.cursor_execute("CREATE TABLE ..."),
|
|
mock.call.cursor_execute("CREATE TABLE ..."),
|
|
mock.call.commit(mock.ANY),
|
|
],
|
|
)
|
|
|
|
def test_ddl_connection_explicit_begin_transaction(
|
|
self, metadata_fixture, listening_engine_fixture
|
|
):
|
|
eng, m1 = listening_engine_fixture
|
|
|
|
with eng.connect() as conn:
|
|
with conn.begin():
|
|
metadata_fixture.create_all(conn)
|
|
|
|
eq_(
|
|
m1.mock_calls,
|
|
[
|
|
mock.call.begin(mock.ANY),
|
|
mock.call.cursor_execute("CREATE TABLE ..."),
|
|
mock.call.cursor_execute("CREATE TABLE ..."),
|
|
mock.call.commit(mock.ANY),
|
|
],
|
|
)
|
|
|
|
|
|
class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
|
|
def mock_engine(self):
|
|
def executor(*a, **kw):
|
|
return None
|
|
|
|
engine = create_mock_engine(testing.db.name + "://", executor)
|
|
# fmt: off
|
|
engine.dialect.identifier_preparer = \
|
|
tsa.sql.compiler.IdentifierPreparer(
|
|
engine.dialect
|
|
)
|
|
# fmt: on
|
|
return engine
|
|
|
|
def test_tokens(self):
|
|
m = MetaData()
|
|
sane_alone = Table("t", m, Column("id", Integer))
|
|
sane_schema = Table("t", m, Column("id", Integer), schema="s")
|
|
insane_alone = Table("t t", m, Column("id", Integer))
|
|
insane_schema = Table("t t", m, Column("id", Integer), schema="s s")
|
|
ddl = DDL("%(schema)s-%(table)s-%(fullname)s")
|
|
dialect = self.mock_engine().dialect
|
|
self.assert_compile(ddl.against(sane_alone), "-t-t", dialect=dialect)
|
|
self.assert_compile(
|
|
ddl.against(sane_schema), "s-t-s.t", dialect=dialect
|
|
)
|
|
self.assert_compile(
|
|
ddl.against(insane_alone), '-"t t"-"t t"', dialect=dialect
|
|
)
|
|
self.assert_compile(
|
|
ddl.against(insane_schema),
|
|
'"s s"-"t t"-"s s"."t t"',
|
|
dialect=dialect,
|
|
)
|
|
|
|
# overrides are used piece-meal and verbatim.
|
|
|
|
ddl = DDL(
|
|
"%(schema)s-%(table)s-%(fullname)s-%(bonus)s",
|
|
context={"schema": "S S", "table": "T T", "bonus": "b"},
|
|
)
|
|
self.assert_compile(
|
|
ddl.against(sane_alone), "S S-T T-t-b", dialect=dialect
|
|
)
|
|
self.assert_compile(
|
|
ddl.against(sane_schema), "S S-T T-s.t-b", dialect=dialect
|
|
)
|
|
self.assert_compile(
|
|
ddl.against(insane_alone), 'S S-T T-"t t"-b', dialect=dialect
|
|
)
|
|
self.assert_compile(
|
|
ddl.against(insane_schema),
|
|
'S S-T T-"s s"."t t"-b',
|
|
dialect=dialect,
|
|
)
|
|
|
|
def test_filter(self):
|
|
cx = self.mock_engine()
|
|
|
|
tbl = Table("t", MetaData(), Column("id", Integer))
|
|
target = cx.name
|
|
|
|
assert DDL("")._should_execute(tbl, cx)
|
|
assert DDL("").execute_if(dialect=target)._should_execute(tbl, cx)
|
|
assert not DDL("").execute_if(dialect="bogus")._should_execute(tbl, cx)
|
|
assert (
|
|
DDL("")
|
|
.execute_if(callable_=lambda d, y, z, **kw: True)
|
|
._should_execute(tbl, cx)
|
|
)
|
|
assert (
|
|
DDL("")
|
|
.execute_if(
|
|
callable_=lambda d, y, z, **kw: z.engine.name != "bogus"
|
|
)
|
|
._should_execute(tbl, cx)
|
|
)
|
|
|
|
@testing.variation("include_context", [True, False])
|
|
def test_repr(self, include_context):
|
|
sql = "SELECT :foo"
|
|
|
|
if include_context:
|
|
context = {"foo": 1}
|
|
ddl = DDL(sql, context=context)
|
|
eq_(repr(ddl), f"<DDL@{id(ddl)}; '{sql}', context={context}>")
|
|
else:
|
|
ddl = DDL(sql)
|
|
eq_(repr(ddl), f"<DDL@{id(ddl)}; '{sql}'>")
|