mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-15 05:07:16 -04:00
93b7767d00
The :meth:`.Connection.connect` method is deprecated as is the concept of "connection branching", which copies a :class:`.Connection` into a new one that has a no-op ".close()" method. This pattern is oriented around the "connectionless execution" concept which is also being removed in 2.0. As part of this change we begin to move the internals away from "connectionless execution" overall. Remove the "connectionless execution" concept from the reflection internals and replace with explicit patterns at the Inspector level. Fixes: #5131 Change-Id: Id23d28a9889212ac5ae7329b85136157815d3e6f
168 lines
6.0 KiB
Python
168 lines
6.0 KiB
Python
"""tests the "bind" attribute/argument across schema and SQL,
|
|
including the deprecated versions of these arguments"""
|
|
|
|
import sqlalchemy as sa
|
|
from sqlalchemy import engine
|
|
from sqlalchemy import exc
|
|
from sqlalchemy import inspect
|
|
from sqlalchemy import Integer
|
|
from sqlalchemy import MetaData
|
|
from sqlalchemy import testing
|
|
from sqlalchemy import text
|
|
from sqlalchemy import ThreadLocalMetaData
|
|
from sqlalchemy.testing import assert_raises
|
|
from sqlalchemy.testing import assert_raises_message
|
|
from sqlalchemy.testing import fixtures
|
|
from sqlalchemy.testing import is_false
|
|
from sqlalchemy.testing import is_true
|
|
from sqlalchemy.testing.schema import Column
|
|
from sqlalchemy.testing.schema import Table
|
|
|
|
|
|
class BindTest(fixtures.TestBase):
|
|
def test_bind_close_engine(self):
|
|
e = testing.db
|
|
with e.connect() as conn:
|
|
assert not conn.closed
|
|
assert conn.closed
|
|
|
|
def test_create_drop_explicit(self):
|
|
metadata = MetaData()
|
|
table = Table("test_table", metadata, Column("foo", Integer))
|
|
for bind in (testing.db, testing.db.connect()):
|
|
for args in [([], {"bind": bind}), ([bind], {})]:
|
|
metadata.create_all(*args[0], **args[1])
|
|
is_true(inspect(bind).has_table(table.name))
|
|
metadata.drop_all(*args[0], **args[1])
|
|
table.create(*args[0], **args[1])
|
|
table.drop(*args[0], **args[1])
|
|
is_false(inspect(bind).has_table(table.name))
|
|
|
|
def test_create_drop_err_metadata(self):
|
|
metadata = MetaData()
|
|
Table("test_table", metadata, Column("foo", Integer))
|
|
for meth in [metadata.create_all, metadata.drop_all]:
|
|
assert_raises_message(
|
|
exc.UnboundExecutionError,
|
|
"MetaData object is not bound to an Engine or Connection.",
|
|
meth,
|
|
)
|
|
|
|
def test_create_drop_err_table(self):
|
|
metadata = MetaData()
|
|
table = Table("test_table", metadata, Column("foo", Integer))
|
|
|
|
for meth in [table.create, table.drop]:
|
|
assert_raises_message(
|
|
exc.UnboundExecutionError,
|
|
(
|
|
"Table object 'test_table' is not bound to an Engine or "
|
|
"Connection."
|
|
),
|
|
meth,
|
|
)
|
|
|
|
@testing.uses_deprecated()
|
|
def test_create_drop_bound(self):
|
|
|
|
for meta in (MetaData, ThreadLocalMetaData):
|
|
for bind in (testing.db, testing.db.connect()):
|
|
metadata = meta()
|
|
table = Table("test_table", metadata, Column("foo", Integer))
|
|
metadata.bind = bind
|
|
assert metadata.bind is table.bind is bind
|
|
metadata.create_all()
|
|
assert table.exists()
|
|
metadata.drop_all()
|
|
table.create()
|
|
table.drop()
|
|
assert not table.exists()
|
|
|
|
metadata = meta()
|
|
table = Table("test_table", metadata, Column("foo", Integer))
|
|
|
|
metadata.bind = bind
|
|
|
|
assert metadata.bind is table.bind is bind
|
|
metadata.create_all()
|
|
assert table.exists()
|
|
metadata.drop_all()
|
|
table.create()
|
|
table.drop()
|
|
assert not table.exists()
|
|
if isinstance(bind, engine.Connection):
|
|
bind.close()
|
|
|
|
def test_create_drop_constructor_bound(self):
|
|
for bind in (testing.db, testing.db.connect()):
|
|
try:
|
|
for args in (([bind], {}), ([], {"bind": bind})):
|
|
metadata = MetaData(*args[0], **args[1])
|
|
table = Table(
|
|
"test_table", metadata, Column("foo", Integer)
|
|
)
|
|
assert metadata.bind is table.bind is bind
|
|
metadata.create_all()
|
|
is_true(inspect(bind).has_table(table.name))
|
|
metadata.drop_all()
|
|
table.create()
|
|
table.drop()
|
|
is_false(inspect(bind).has_table(table.name))
|
|
finally:
|
|
if isinstance(bind, engine.Connection):
|
|
bind.close()
|
|
|
|
def test_implicit_execution(self):
|
|
metadata = MetaData()
|
|
table = Table(
|
|
"test_table",
|
|
metadata,
|
|
Column("foo", Integer),
|
|
test_needs_acid=True,
|
|
)
|
|
conn = testing.db.connect()
|
|
metadata.create_all(bind=conn)
|
|
try:
|
|
trans = conn.begin()
|
|
metadata.bind = conn
|
|
t = table.insert()
|
|
assert t.bind is conn
|
|
table.insert().execute(foo=5)
|
|
table.insert().execute(foo=6)
|
|
table.insert().execute(foo=7)
|
|
trans.rollback()
|
|
metadata.bind = None
|
|
assert (
|
|
conn.execute("select count(*) from test_table").scalar() == 0
|
|
)
|
|
finally:
|
|
metadata.drop_all(bind=conn)
|
|
|
|
def test_clauseelement(self):
|
|
metadata = MetaData()
|
|
table = Table("test_table", metadata, Column("foo", Integer))
|
|
metadata.create_all(bind=testing.db)
|
|
try:
|
|
for elem in [
|
|
table.select,
|
|
lambda **kwargs: sa.func.current_timestamp(**kwargs).select(),
|
|
# func.current_timestamp().select,
|
|
lambda **kwargs: text("select * from test_table", **kwargs),
|
|
]:
|
|
for bind in (testing.db, testing.db.connect()):
|
|
try:
|
|
e = elem(bind=bind)
|
|
assert e.bind is bind
|
|
e.execute().close()
|
|
finally:
|
|
if isinstance(bind, engine.Connection):
|
|
bind.close()
|
|
|
|
e = elem()
|
|
assert e.bind is None
|
|
assert_raises(exc.UnboundExecutionError, e.execute)
|
|
finally:
|
|
if isinstance(bind, engine.Connection):
|
|
bind.close()
|
|
metadata.drop_all(bind=testing.db)
|