mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-09 02:09:53 -04:00
202 lines
6.6 KiB
Python
202 lines
6.6 KiB
Python
"""tests the "bind" attribute/argument across schema and SQL,
|
|
including the deprecated versions of these arguments"""
|
|
|
|
from sqlalchemy.testing import assert_raises, assert_raises_message
|
|
from sqlalchemy import engine, exc
|
|
from sqlalchemy import MetaData, ThreadLocalMetaData
|
|
from sqlalchemy import Integer, text
|
|
from sqlalchemy.testing.schema import Table
|
|
from sqlalchemy.testing.schema import Column
|
|
import sqlalchemy as sa
|
|
from sqlalchemy import testing
|
|
from sqlalchemy.testing import fixtures
|
|
|
|
|
|
class BindTest(fixtures.TestBase):
|
|
def test_bind_close_engine(self):
|
|
e = testing.db
|
|
with e.connect() as conn:
|
|
assert not conn.closed
|
|
assert conn.closed
|
|
|
|
with e.contextual_connect() as conn:
|
|
assert not conn.closed
|
|
assert conn.closed
|
|
|
|
def test_bind_close_conn(self):
|
|
e = testing.db
|
|
conn = e.connect()
|
|
with conn.connect() as c2:
|
|
assert not c2.closed
|
|
assert not conn.closed
|
|
assert c2.closed
|
|
|
|
with conn.contextual_connect() as c2:
|
|
assert not c2.closed
|
|
assert not conn.closed
|
|
assert c2.closed
|
|
|
|
def test_create_drop_explicit(self):
|
|
metadata = MetaData()
|
|
table = Table('test_table', metadata,
|
|
Column('foo', Integer))
|
|
for bind in (
|
|
testing.db,
|
|
testing.db.connect()
|
|
):
|
|
for args in [
|
|
([], {'bind': bind}),
|
|
([bind], {})
|
|
]:
|
|
metadata.create_all(*args[0], **args[1])
|
|
assert table.exists(*args[0], **args[1])
|
|
metadata.drop_all(*args[0], **args[1])
|
|
table.create(*args[0], **args[1])
|
|
table.drop(*args[0], **args[1])
|
|
assert not table.exists(*args[0], **args[1])
|
|
|
|
def test_create_drop_err_metadata(self):
|
|
metadata = MetaData()
|
|
Table('test_table', metadata, Column('foo', Integer))
|
|
for meth in [metadata.create_all, metadata.drop_all]:
|
|
assert_raises_message(
|
|
exc.UnboundExecutionError,
|
|
"MetaData object is not bound to an Engine or Connection.",
|
|
meth
|
|
)
|
|
|
|
def test_create_drop_err_table(self):
|
|
metadata = MetaData()
|
|
table = Table('test_table', metadata,
|
|
Column('foo', Integer))
|
|
|
|
for meth in [
|
|
table.exists,
|
|
table.create,
|
|
table.drop,
|
|
]:
|
|
assert_raises_message(
|
|
exc.UnboundExecutionError,
|
|
("Table object 'test_table' is not bound to an Engine or "
|
|
"Connection."),
|
|
meth
|
|
)
|
|
|
|
@testing.uses_deprecated()
|
|
def test_create_drop_bound(self):
|
|
|
|
for meta in (MetaData, ThreadLocalMetaData):
|
|
for bind in (
|
|
testing.db,
|
|
testing.db.connect()
|
|
):
|
|
metadata = meta()
|
|
table = Table('test_table', metadata,
|
|
Column('foo', Integer))
|
|
metadata.bind = bind
|
|
assert metadata.bind is table.bind is bind
|
|
metadata.create_all()
|
|
assert table.exists()
|
|
metadata.drop_all()
|
|
table.create()
|
|
table.drop()
|
|
assert not table.exists()
|
|
|
|
metadata = meta()
|
|
table = Table('test_table', metadata,
|
|
Column('foo', Integer))
|
|
|
|
metadata.bind = bind
|
|
|
|
assert metadata.bind is table.bind is bind
|
|
metadata.create_all()
|
|
assert table.exists()
|
|
metadata.drop_all()
|
|
table.create()
|
|
table.drop()
|
|
assert not table.exists()
|
|
if isinstance(bind, engine.Connection):
|
|
bind.close()
|
|
|
|
def test_create_drop_constructor_bound(self):
|
|
for bind in (
|
|
testing.db,
|
|
testing.db.connect()
|
|
):
|
|
try:
|
|
for args in (
|
|
([bind], {}),
|
|
([], {'bind': bind}),
|
|
):
|
|
metadata = MetaData(*args[0], **args[1])
|
|
table = Table('test_table', metadata,
|
|
Column('foo', Integer))
|
|
assert metadata.bind is table.bind is bind
|
|
metadata.create_all()
|
|
assert table.exists()
|
|
metadata.drop_all()
|
|
table.create()
|
|
table.drop()
|
|
assert not table.exists()
|
|
finally:
|
|
if isinstance(bind, engine.Connection):
|
|
bind.close()
|
|
|
|
def test_implicit_execution(self):
|
|
metadata = MetaData()
|
|
table = Table('test_table', metadata,
|
|
Column('foo', Integer),
|
|
test_needs_acid=True,
|
|
)
|
|
conn = testing.db.connect()
|
|
metadata.create_all(bind=conn)
|
|
try:
|
|
trans = conn.begin()
|
|
metadata.bind = conn
|
|
t = table.insert()
|
|
assert t.bind is conn
|
|
table.insert().execute(foo=5)
|
|
table.insert().execute(foo=6)
|
|
table.insert().execute(foo=7)
|
|
trans.rollback()
|
|
metadata.bind = None
|
|
assert conn.execute('select count(*) from test_table'
|
|
).scalar() == 0
|
|
finally:
|
|
metadata.drop_all(bind=conn)
|
|
|
|
def test_clauseelement(self):
|
|
metadata = MetaData()
|
|
table = Table('test_table', metadata,
|
|
Column('foo', Integer))
|
|
metadata.create_all(bind=testing.db)
|
|
try:
|
|
for elem in [
|
|
table.select,
|
|
lambda **kwargs: sa.func.current_timestamp(**kwargs).select(),
|
|
# func.current_timestamp().select,
|
|
lambda **kwargs:text("select * from test_table", **kwargs)
|
|
]:
|
|
for bind in (
|
|
testing.db,
|
|
testing.db.connect()
|
|
):
|
|
try:
|
|
e = elem(bind=bind)
|
|
assert e.bind is bind
|
|
e.execute().close()
|
|
finally:
|
|
if isinstance(bind, engine.Connection):
|
|
bind.close()
|
|
|
|
e = elem()
|
|
assert e.bind is None
|
|
assert_raises(
|
|
exc.UnboundExecutionError,
|
|
e.execute
|
|
)
|
|
finally:
|
|
if isinstance(bind, engine.Connection):
|
|
bind.close()
|
|
metadata.drop_all(bind=testing.db)
|