mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-08 01:40:48 -04:00
abd9f52ade
working on some linux platforms. [ticket:2669]
1427 lines
51 KiB
Python
1427 lines
51 KiB
Python
from test.lib.testing import eq_, assert_raises, assert_raises_message, config
|
|
import re
|
|
from test.lib.util import picklers
|
|
from sqlalchemy.interfaces import ConnectionProxy
|
|
from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, \
|
|
bindparam, select, event, TypeDecorator, create_engine, Sequence
|
|
from sqlalchemy.sql import column, literal
|
|
from test.lib.schema import Table, Column
|
|
import sqlalchemy as tsa
|
|
from test.lib import testing, engines
|
|
from test.lib.engines import testing_engine
|
|
import logging.handlers
|
|
from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
|
|
from sqlalchemy.engine import base, default
|
|
from sqlalchemy.engine.base import Connection, Engine
|
|
from test.lib import fixtures
|
|
import StringIO
|
|
|
|
users, metadata = None, None
|
|
class ExecuteTest(fixtures.TestBase):
|
|
@classmethod
|
|
def setup_class(cls):
|
|
global users, users_autoinc, metadata
|
|
metadata = MetaData(testing.db)
|
|
users = Table('users', metadata,
|
|
Column('user_id', INT, primary_key = True, autoincrement=False),
|
|
Column('user_name', VARCHAR(20)),
|
|
)
|
|
users_autoinc = Table('users_autoinc', metadata,
|
|
Column('user_id', INT, primary_key = True,
|
|
test_needs_autoincrement=True),
|
|
Column('user_name', VARCHAR(20)),
|
|
)
|
|
metadata.create_all()
|
|
|
|
@engines.close_first
|
|
def teardown(self):
|
|
testing.db.execute(users.delete())
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
metadata.drop_all()
|
|
|
|
@testing.fails_on("postgresql+pg8000",
|
|
"pg8000 still doesn't allow single % without params")
|
|
def test_no_params_option(self):
|
|
stmt = "SELECT '%'"
|
|
if testing.against('oracle'):
|
|
stmt += " FROM DUAL"
|
|
conn = testing.db.connect()
|
|
result = conn.\
|
|
execution_options(no_parameters=True).\
|
|
scalar(stmt)
|
|
eq_(result, '%')
|
|
|
|
@testing.fails_on_everything_except('firebird', 'maxdb',
|
|
'sqlite', '+pyodbc',
|
|
'+mxodbc', '+zxjdbc', 'mysql+oursql',
|
|
'informix+informixdb')
|
|
def test_raw_qmark(self):
|
|
def go(conn):
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (?, ?)', (1, 'jack'))
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (?, ?)', [2, 'fred'])
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (?, ?)', [3, 'ed'], [4, 'horse'])
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (?, ?)', (5, 'barney'), (6, 'donkey'))
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (?, ?)', 7, 'sally')
|
|
res = conn.execute('select * from users order by user_id')
|
|
assert res.fetchall() == [
|
|
(1, 'jack'),
|
|
(2, 'fred'),
|
|
(3, 'ed'),
|
|
(4, 'horse'),
|
|
(5, 'barney'),
|
|
(6, 'donkey'),
|
|
(7, 'sally'),
|
|
]
|
|
for multiparam, param in [
|
|
(("jack", "fred"), {}),
|
|
((["jack", "fred"],), {})
|
|
]:
|
|
res = conn.execute(
|
|
"select * from users where user_name=? or "
|
|
"user_name=? order by user_id",
|
|
*multiparam, **param)
|
|
assert res.fetchall() == [
|
|
(1, 'jack'),
|
|
(2, 'fred')
|
|
]
|
|
res = conn.execute("select * from users where user_name=?",
|
|
"jack"
|
|
)
|
|
assert res.fetchall() == [(1, 'jack')]
|
|
conn.execute('delete from users')
|
|
|
|
go(testing.db)
|
|
conn = testing.db.connect()
|
|
try:
|
|
go(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
# some psycopg2 versions bomb this.
|
|
@testing.fails_on_everything_except('mysql+mysqldb', 'mysql+pymysql',
|
|
'mysql+mysqlconnector', 'postgresql')
|
|
@testing.fails_on('postgresql+zxjdbc', 'sprintf not supported')
|
|
def test_raw_sprintf(self):
|
|
def go(conn):
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (%s, %s)', [1, 'jack'])
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (%s, %s)', [2, 'ed'], [3, 'horse'])
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (%s, %s)', 4, 'sally')
|
|
conn.execute('insert into users (user_id) values (%s)', 5)
|
|
res = conn.execute('select * from users order by user_id')
|
|
assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3,
|
|
'horse'), (4, 'sally'), (5, None)]
|
|
for multiparam, param in [
|
|
(("jack", "ed"), {}),
|
|
((["jack", "ed"],), {})
|
|
]:
|
|
res = conn.execute(
|
|
"select * from users where user_name=%s or "
|
|
"user_name=%s order by user_id",
|
|
*multiparam, **param)
|
|
assert res.fetchall() == [
|
|
(1, 'jack'),
|
|
(2, 'ed')
|
|
]
|
|
res = conn.execute("select * from users where user_name=%s",
|
|
"jack"
|
|
)
|
|
assert res.fetchall() == [(1, 'jack')]
|
|
|
|
conn.execute('delete from users')
|
|
go(testing.db)
|
|
conn = testing.db.connect()
|
|
try:
|
|
go(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
# pyformat is supported for mysql, but skipping because a few driver
|
|
# versions have a bug that bombs out on this test. (1.2.2b3,
|
|
# 1.2.2c1, 1.2.2)
|
|
|
|
@testing.skip_if(lambda : testing.against('mysql+mysqldb'),
|
|
'db-api flaky')
|
|
@testing.fails_on_everything_except('postgresql+psycopg2',
|
|
'postgresql+pypostgresql', 'mysql+mysqlconnector',
|
|
'mysql+pymysql')
|
|
def test_raw_python(self):
|
|
def go(conn):
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (%(id)s, %(name)s)', {'id': 1, 'name'
|
|
: 'jack'})
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (%(id)s, %(name)s)', {'id': 2, 'name'
|
|
: 'ed'}, {'id': 3, 'name': 'horse'})
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (%(id)s, %(name)s)', id=4, name='sally'
|
|
)
|
|
res = conn.execute('select * from users order by user_id')
|
|
assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3,
|
|
'horse'), (4, 'sally')]
|
|
conn.execute('delete from users')
|
|
go(testing.db)
|
|
conn = testing.db.connect()
|
|
try:
|
|
go(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
@testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle', 'informix+informixdb')
|
|
def test_raw_named(self):
|
|
def go(conn):
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (:id, :name)', {'id': 1, 'name': 'jack'
|
|
})
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (:id, :name)', {'id': 2, 'name': 'ed'
|
|
}, {'id': 3, 'name': 'horse'})
|
|
conn.execute('insert into users (user_id, user_name) '
|
|
'values (:id, :name)', id=4, name='sally')
|
|
res = conn.execute('select * from users order by user_id')
|
|
assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3,
|
|
'horse'), (4, 'sally')]
|
|
conn.execute('delete from users')
|
|
go(testing.db)
|
|
conn= testing.db.connect()
|
|
try:
|
|
go(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_exception_wrapping_dbapi(self):
|
|
def go(conn):
|
|
assert_raises_message(
|
|
tsa.exc.DBAPIError,
|
|
r"not_a_valid_statement",
|
|
conn.execute, 'not_a_valid_statement'
|
|
)
|
|
go(testing.db)
|
|
conn = testing.db.connect()
|
|
try:
|
|
go(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_exception_wrapping_non_dbapi_statement(self):
|
|
class MyType(TypeDecorator):
|
|
impl = Integer
|
|
def process_bind_param(self, value, dialect):
|
|
raise Exception("nope")
|
|
|
|
def _go(conn):
|
|
assert_raises_message(
|
|
tsa.exc.StatementError,
|
|
r"nope \(original cause: Exception: nope\) 'SELECT 1 ",
|
|
conn.execute,
|
|
select([1]).\
|
|
where(
|
|
column('foo') == literal('bar', MyType())
|
|
)
|
|
)
|
|
_go(testing.db)
|
|
conn = testing.db.connect()
|
|
try:
|
|
_go(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_stmt_exception_pickleable_no_dbapi(self):
|
|
self._test_stmt_exception_pickleable(Exception("hello world"))
|
|
|
|
@testing.crashes("postgresql+psycopg2",
|
|
"Older versions dont support cursor pickling, newer ones do")
|
|
@testing.fails_on("mysql+oursql",
|
|
"Exception doesn't come back exactly the same from pickle")
|
|
@testing.fails_on("oracle+cx_oracle",
|
|
"cx_oracle exception seems to be having "
|
|
"some issue with pickling")
|
|
def test_stmt_exception_pickleable_plus_dbapi(self):
|
|
raw = testing.db.raw_connection()
|
|
the_orig = None
|
|
try:
|
|
try:
|
|
cursor = raw.cursor()
|
|
cursor.execute("SELECTINCORRECT")
|
|
except testing.db.dialect.dbapi.DatabaseError, orig:
|
|
# py3k has "orig" in local scope...
|
|
the_orig = orig
|
|
finally:
|
|
raw.close()
|
|
self._test_stmt_exception_pickleable(the_orig)
|
|
|
|
def _test_stmt_exception_pickleable(self, orig):
|
|
for sa_exc in (
|
|
tsa.exc.StatementError("some error",
|
|
"select * from table",
|
|
{"foo":"bar"},
|
|
orig),
|
|
tsa.exc.InterfaceError("select * from table",
|
|
{"foo":"bar"},
|
|
orig),
|
|
tsa.exc.NoReferencedTableError("message", "tname"),
|
|
tsa.exc.NoReferencedColumnError("message", "tname", "cname"),
|
|
tsa.exc.CircularDependencyError("some message", [1, 2, 3], [(1, 2), (3, 4)]),
|
|
):
|
|
for loads, dumps in picklers():
|
|
repickled = loads(dumps(sa_exc))
|
|
eq_(repickled.args[0], sa_exc.args[0])
|
|
if isinstance(sa_exc, tsa.exc.StatementError):
|
|
eq_(repickled.params, {"foo":"bar"})
|
|
eq_(repickled.statement, sa_exc.statement)
|
|
if hasattr(sa_exc, "connection_invalidated"):
|
|
eq_(repickled.connection_invalidated,
|
|
sa_exc.connection_invalidated)
|
|
eq_(repickled.orig.args[0], orig.args[0])
|
|
|
|
def test_dont_wrap_mixin(self):
|
|
class MyException(Exception, tsa.exc.DontWrapMixin):
|
|
pass
|
|
|
|
class MyType(TypeDecorator):
|
|
impl = Integer
|
|
def process_bind_param(self, value, dialect):
|
|
raise MyException("nope")
|
|
|
|
def _go(conn):
|
|
assert_raises_message(
|
|
MyException,
|
|
"nope",
|
|
conn.execute,
|
|
select([1]).\
|
|
where(
|
|
column('foo') == literal('bar', MyType())
|
|
)
|
|
)
|
|
_go(testing.db)
|
|
conn = testing.db.connect()
|
|
try:
|
|
_go(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_empty_insert(self):
|
|
"""test that execute() interprets [] as a list with no params"""
|
|
|
|
result = \
|
|
testing.db.execute(users_autoinc.insert().
|
|
values(user_name=bindparam('name')), [])
|
|
eq_(testing.db.execute(users_autoinc.select()).fetchall(), [(1,
|
|
None)])
|
|
|
|
@testing.requires.ad_hoc_engines
|
|
def test_engine_level_options(self):
|
|
eng = engines.testing_engine(options={'execution_options'
|
|
: {'foo': 'bar'}})
|
|
conn = eng.contextual_connect()
|
|
eq_(conn._execution_options['foo'], 'bar')
|
|
eq_(conn.execution_options(bat='hoho')._execution_options['foo'
|
|
], 'bar')
|
|
eq_(conn.execution_options(bat='hoho')._execution_options['bat'
|
|
], 'hoho')
|
|
eq_(conn.execution_options(foo='hoho')._execution_options['foo'
|
|
], 'hoho')
|
|
eng.update_execution_options(foo='hoho')
|
|
conn = eng.contextual_connect()
|
|
eq_(conn._execution_options['foo'], 'hoho')
|
|
|
|
def test_unicode_test_fails_warning(self):
|
|
class MockCursor(engines.DBAPIProxyCursor):
|
|
def execute(self, stmt, params=None, **kw):
|
|
if "test unicode returns" in stmt:
|
|
raise self.engine.dialect.dbapi.DatabaseError("boom")
|
|
else:
|
|
return super(MockCursor, self).execute(stmt, params, **kw)
|
|
eng = engines.proxying_engine(cursor_cls=MockCursor)
|
|
assert_raises_message(
|
|
tsa.exc.SAWarning,
|
|
"Exception attempting to detect unicode returns",
|
|
eng.connect
|
|
)
|
|
assert eng.dialect.returns_unicode_strings in (True, False)
|
|
eng.dispose()
|
|
|
|
class ConvenienceExecuteTest(fixtures.TablesTest):
|
|
@classmethod
|
|
def define_tables(cls, metadata):
|
|
cls.table = Table('exec_test', metadata,
|
|
Column('a', Integer),
|
|
Column('b', Integer),
|
|
test_needs_acid=True
|
|
)
|
|
|
|
def _trans_fn(self, is_transaction=False):
|
|
def go(conn, x, value=None):
|
|
if is_transaction:
|
|
conn = conn.connection
|
|
conn.execute(self.table.insert().values(a=x, b=value))
|
|
return go
|
|
|
|
def _trans_rollback_fn(self, is_transaction=False):
|
|
def go(conn, x, value=None):
|
|
if is_transaction:
|
|
conn = conn.connection
|
|
conn.execute(self.table.insert().values(a=x, b=value))
|
|
raise Exception("breakage")
|
|
return go
|
|
|
|
def _assert_no_data(self):
|
|
eq_(
|
|
testing.db.scalar(self.table.count()), 0
|
|
)
|
|
|
|
def _assert_fn(self, x, value=None):
|
|
eq_(
|
|
testing.db.execute(self.table.select()).fetchall(),
|
|
[(x, value)]
|
|
)
|
|
|
|
def test_transaction_engine_ctx_commit(self):
|
|
fn = self._trans_fn()
|
|
ctx = testing.db.begin()
|
|
testing.run_as_contextmanager(ctx, fn, 5, value=8)
|
|
self._assert_fn(5, value=8)
|
|
|
|
def test_transaction_engine_ctx_begin_fails(self):
|
|
engine = engines.testing_engine()
|
|
class MockConnection(Connection):
|
|
closed = False
|
|
def begin(self):
|
|
raise Exception("boom")
|
|
|
|
def close(self):
|
|
MockConnection.closed = True
|
|
engine._connection_cls = MockConnection
|
|
fn = self._trans_fn()
|
|
assert_raises(
|
|
Exception,
|
|
engine.begin
|
|
)
|
|
assert MockConnection.closed
|
|
|
|
def test_transaction_engine_ctx_rollback(self):
|
|
fn = self._trans_rollback_fn()
|
|
ctx = testing.db.begin()
|
|
assert_raises_message(
|
|
Exception,
|
|
"breakage",
|
|
testing.run_as_contextmanager, ctx, fn, 5, value=8
|
|
)
|
|
self._assert_no_data()
|
|
|
|
def test_transaction_tlocal_engine_ctx_commit(self):
|
|
fn = self._trans_fn()
|
|
engine = engines.testing_engine(options=dict(
|
|
strategy='threadlocal',
|
|
pool=testing.db.pool))
|
|
ctx = engine.begin()
|
|
testing.run_as_contextmanager(ctx, fn, 5, value=8)
|
|
self._assert_fn(5, value=8)
|
|
|
|
def test_transaction_tlocal_engine_ctx_rollback(self):
|
|
fn = self._trans_rollback_fn()
|
|
engine = engines.testing_engine(options=dict(
|
|
strategy='threadlocal',
|
|
pool=testing.db.pool))
|
|
ctx = engine.begin()
|
|
assert_raises_message(
|
|
Exception,
|
|
"breakage",
|
|
testing.run_as_contextmanager, ctx, fn, 5, value=8
|
|
)
|
|
self._assert_no_data()
|
|
|
|
def test_transaction_connection_ctx_commit(self):
|
|
fn = self._trans_fn(True)
|
|
conn = testing.db.connect()
|
|
ctx = conn.begin()
|
|
testing.run_as_contextmanager(ctx, fn, 5, value=8)
|
|
self._assert_fn(5, value=8)
|
|
|
|
def test_transaction_connection_ctx_rollback(self):
|
|
fn = self._trans_rollback_fn(True)
|
|
conn = testing.db.connect()
|
|
ctx = conn.begin()
|
|
assert_raises_message(
|
|
Exception,
|
|
"breakage",
|
|
testing.run_as_contextmanager, ctx, fn, 5, value=8
|
|
)
|
|
self._assert_no_data()
|
|
|
|
def test_connection_as_ctx(self):
|
|
fn = self._trans_fn()
|
|
ctx = testing.db.connect()
|
|
testing.run_as_contextmanager(ctx, fn, 5, value=8)
|
|
# autocommit is on
|
|
self._assert_fn(5, value=8)
|
|
|
|
@testing.fails_on('mysql+oursql', "oursql bug ? getting wrong rowcount")
|
|
def test_connect_as_ctx_noautocommit(self):
|
|
fn = self._trans_fn()
|
|
self._assert_no_data()
|
|
ctx = testing.db.connect().execution_options(autocommit=False)
|
|
testing.run_as_contextmanager(ctx, fn, 5, value=8)
|
|
# autocommit is off
|
|
self._assert_no_data()
|
|
|
|
def test_transaction_engine_fn_commit(self):
|
|
fn = self._trans_fn()
|
|
testing.db.transaction(fn, 5, value=8)
|
|
self._assert_fn(5, value=8)
|
|
|
|
def test_transaction_engine_fn_rollback(self):
|
|
fn = self._trans_rollback_fn()
|
|
assert_raises_message(
|
|
Exception,
|
|
"breakage",
|
|
testing.db.transaction, fn, 5, value=8
|
|
)
|
|
self._assert_no_data()
|
|
|
|
def test_transaction_connection_fn_commit(self):
|
|
fn = self._trans_fn()
|
|
conn = testing.db.connect()
|
|
conn.transaction(fn, 5, value=8)
|
|
self._assert_fn(5, value=8)
|
|
|
|
def test_transaction_connection_fn_rollback(self):
|
|
fn = self._trans_rollback_fn()
|
|
conn = testing.db.connect()
|
|
assert_raises(
|
|
Exception,
|
|
conn.transaction, fn, 5, value=8
|
|
)
|
|
self._assert_no_data()
|
|
|
|
class CompiledCacheTest(fixtures.TestBase):
|
|
@classmethod
|
|
def setup_class(cls):
|
|
global users, metadata
|
|
metadata = MetaData(testing.db)
|
|
users = Table('users', metadata,
|
|
Column('user_id', INT, primary_key=True,
|
|
test_needs_autoincrement=True),
|
|
Column('user_name', VARCHAR(20)),
|
|
)
|
|
metadata.create_all()
|
|
|
|
@engines.close_first
|
|
def teardown(self):
|
|
testing.db.execute(users.delete())
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
metadata.drop_all()
|
|
|
|
def test_cache(self):
|
|
conn = testing.db.connect()
|
|
cache = {}
|
|
cached_conn = conn.execution_options(compiled_cache=cache)
|
|
|
|
ins = users.insert()
|
|
cached_conn.execute(ins, {'user_name':'u1'})
|
|
cached_conn.execute(ins, {'user_name':'u2'})
|
|
cached_conn.execute(ins, {'user_name':'u3'})
|
|
assert len(cache) == 1
|
|
eq_(conn.execute("select count(*) from users").scalar(), 3)
|
|
|
|
class LogParamsTest(fixtures.TestBase):
|
|
__only_on__ = 'sqlite'
|
|
__requires__ = 'ad_hoc_engines',
|
|
|
|
def setup(self):
|
|
self.eng = engines.testing_engine(options={'echo':True})
|
|
self.eng.execute("create table foo (data string)")
|
|
self.buf = logging.handlers.BufferingHandler(100)
|
|
for log in [
|
|
logging.getLogger('sqlalchemy.engine'),
|
|
logging.getLogger('sqlalchemy.pool')
|
|
]:
|
|
log.addHandler(self.buf)
|
|
|
|
def teardown(self):
|
|
self.eng.execute("drop table foo")
|
|
for log in [
|
|
logging.getLogger('sqlalchemy.engine'),
|
|
logging.getLogger('sqlalchemy.pool')
|
|
]:
|
|
log.removeHandler(self.buf)
|
|
|
|
def test_log_large_dict(self):
|
|
self.eng.execute(
|
|
"INSERT INTO foo (data) values (:data)",
|
|
[{"data":str(i)} for i in xrange(100)]
|
|
)
|
|
eq_(
|
|
self.buf.buffer[1].message,
|
|
"[{'data': '0'}, {'data': '1'}, {'data': '2'}, {'data': '3'}, "
|
|
"{'data': '4'}, {'data': '5'}, {'data': '6'}, {'data': '7'}"
|
|
" ... displaying 10 of 100 total bound "
|
|
"parameter sets ... {'data': '98'}, {'data': '99'}]"
|
|
)
|
|
|
|
def test_log_large_list(self):
|
|
self.eng.execute(
|
|
"INSERT INTO foo (data) values (?)",
|
|
[(str(i), ) for i in xrange(100)]
|
|
)
|
|
eq_(
|
|
self.buf.buffer[1].message,
|
|
"[('0',), ('1',), ('2',), ('3',), ('4',), ('5',), "
|
|
"('6',), ('7',) ... displaying 10 of 100 total "
|
|
"bound parameter sets ... ('98',), ('99',)]"
|
|
)
|
|
|
|
def test_error_large_dict(self):
|
|
assert_raises_message(
|
|
tsa.exc.DBAPIError,
|
|
r".*'INSERT INTO nonexistent \(data\) values \(:data\)' "
|
|
"\[{'data': '0'}, {'data': '1'}, {'data': '2'}, "
|
|
"{'data': '3'}, {'data': '4'}, {'data': '5'}, "
|
|
"{'data': '6'}, {'data': '7'} ... displaying 10 of "
|
|
"100 total bound parameter sets ... {'data': '98'}, {'data': '99'}\]",
|
|
lambda: self.eng.execute(
|
|
"INSERT INTO nonexistent (data) values (:data)",
|
|
[{"data":str(i)} for i in xrange(100)]
|
|
)
|
|
)
|
|
|
|
def test_error_large_list(self):
|
|
assert_raises_message(
|
|
tsa.exc.DBAPIError,
|
|
r".*INSERT INTO nonexistent \(data\) values "
|
|
"\(\?\)' \[\('0',\), \('1',\), \('2',\), \('3',\), "
|
|
"\('4',\), \('5',\), \('6',\), \('7',\) ... displaying "
|
|
"10 of 100 total bound parameter sets ... "
|
|
"\('98',\), \('99',\)\]",
|
|
lambda: self.eng.execute(
|
|
"INSERT INTO nonexistent (data) values (?)",
|
|
[(str(i), ) for i in xrange(100)]
|
|
)
|
|
)
|
|
|
|
class LoggingNameTest(fixtures.TestBase):
|
|
__requires__ = 'ad_hoc_engines',
|
|
|
|
def _assert_names_in_execute(self, eng, eng_name, pool_name):
|
|
eng.execute(select([1]))
|
|
assert self.buf.buffer
|
|
for name in [b.name for b in self.buf.buffer]:
|
|
assert name in (
|
|
'sqlalchemy.engine.base.Engine.%s' % eng_name,
|
|
'sqlalchemy.pool.%s.%s' %
|
|
(eng.pool.__class__.__name__, pool_name)
|
|
)
|
|
|
|
def _assert_no_name_in_execute(self, eng):
|
|
eng.execute(select([1]))
|
|
assert self.buf.buffer
|
|
for name in [b.name for b in self.buf.buffer]:
|
|
assert name in (
|
|
'sqlalchemy.engine.base.Engine',
|
|
'sqlalchemy.pool.%s' % eng.pool.__class__.__name__
|
|
)
|
|
|
|
def _named_engine(self, **kw):
|
|
options = {
|
|
'logging_name':'myenginename',
|
|
'pool_logging_name':'mypoolname',
|
|
'echo':True
|
|
}
|
|
options.update(kw)
|
|
return engines.testing_engine(options=options)
|
|
|
|
def _unnamed_engine(self, **kw):
|
|
kw.update({'echo':True})
|
|
return engines.testing_engine(options=kw)
|
|
|
|
def setup(self):
|
|
self.buf = logging.handlers.BufferingHandler(100)
|
|
for log in [
|
|
logging.getLogger('sqlalchemy.engine'),
|
|
logging.getLogger('sqlalchemy.pool')
|
|
]:
|
|
log.addHandler(self.buf)
|
|
|
|
def teardown(self):
|
|
for log in [
|
|
logging.getLogger('sqlalchemy.engine'),
|
|
logging.getLogger('sqlalchemy.pool')
|
|
]:
|
|
log.removeHandler(self.buf)
|
|
|
|
def test_named_logger_names(self):
|
|
eng = self._named_engine()
|
|
eq_(eng.logging_name, "myenginename")
|
|
eq_(eng.pool.logging_name, "mypoolname")
|
|
|
|
def test_named_logger_names_after_dispose(self):
|
|
eng = self._named_engine()
|
|
eng.execute(select([1]))
|
|
eng.dispose()
|
|
eq_(eng.logging_name, "myenginename")
|
|
eq_(eng.pool.logging_name, "mypoolname")
|
|
|
|
def test_unnamed_logger_names(self):
|
|
eng = self._unnamed_engine()
|
|
eq_(eng.logging_name, None)
|
|
eq_(eng.pool.logging_name, None)
|
|
|
|
def test_named_logger_execute(self):
|
|
eng = self._named_engine()
|
|
self._assert_names_in_execute(eng, "myenginename", "mypoolname")
|
|
|
|
def test_named_logger_echoflags_execute(self):
|
|
eng = self._named_engine(echo='debug', echo_pool='debug')
|
|
self._assert_names_in_execute(eng, "myenginename", "mypoolname")
|
|
|
|
def test_named_logger_execute_after_dispose(self):
|
|
eng = self._named_engine()
|
|
eng.execute(select([1]))
|
|
eng.dispose()
|
|
self._assert_names_in_execute(eng, "myenginename", "mypoolname")
|
|
|
|
def test_unnamed_logger_execute(self):
|
|
eng = self._unnamed_engine()
|
|
self._assert_no_name_in_execute(eng)
|
|
|
|
def test_unnamed_logger_echoflags_execute(self):
|
|
eng = self._unnamed_engine(echo='debug', echo_pool='debug')
|
|
self._assert_no_name_in_execute(eng)
|
|
|
|
class EchoTest(fixtures.TestBase):
|
|
__requires__ = 'ad_hoc_engines',
|
|
|
|
def setup(self):
|
|
self.level = logging.getLogger('sqlalchemy.engine').level
|
|
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
|
|
self.buf = logging.handlers.BufferingHandler(100)
|
|
logging.getLogger('sqlalchemy.engine').addHandler(self.buf)
|
|
|
|
def teardown(self):
|
|
logging.getLogger('sqlalchemy.engine').removeHandler(self.buf)
|
|
logging.getLogger('sqlalchemy.engine').setLevel(self.level)
|
|
|
|
def testing_engine(self):
|
|
e = engines.testing_engine()
|
|
|
|
# do an initial execute to clear out 'first connect'
|
|
# messages
|
|
e.execute(select([10])).close()
|
|
self.buf.flush()
|
|
|
|
return e
|
|
|
|
def test_levels(self):
|
|
e1 = engines.testing_engine()
|
|
|
|
eq_(e1._should_log_info(), False)
|
|
eq_(e1._should_log_debug(), False)
|
|
eq_(e1.logger.isEnabledFor(logging.INFO), False)
|
|
eq_(e1.logger.getEffectiveLevel(), logging.WARN)
|
|
|
|
e1.echo = True
|
|
eq_(e1._should_log_info(), True)
|
|
eq_(e1._should_log_debug(), False)
|
|
eq_(e1.logger.isEnabledFor(logging.INFO), True)
|
|
eq_(e1.logger.getEffectiveLevel(), logging.INFO)
|
|
|
|
e1.echo = 'debug'
|
|
eq_(e1._should_log_info(), True)
|
|
eq_(e1._should_log_debug(), True)
|
|
eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
|
|
eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)
|
|
|
|
e1.echo = False
|
|
eq_(e1._should_log_info(), False)
|
|
eq_(e1._should_log_debug(), False)
|
|
eq_(e1.logger.isEnabledFor(logging.INFO), False)
|
|
eq_(e1.logger.getEffectiveLevel(), logging.WARN)
|
|
|
|
def test_echo_flag_independence(self):
|
|
"""test the echo flag's independence to a specific engine."""
|
|
|
|
e1 = self.testing_engine()
|
|
e2 = self.testing_engine()
|
|
|
|
e1.echo = True
|
|
e1.execute(select([1])).close()
|
|
e2.execute(select([2])).close()
|
|
|
|
e1.echo = False
|
|
e1.execute(select([3])).close()
|
|
e2.execute(select([4])).close()
|
|
|
|
e2.echo = True
|
|
e1.execute(select([5])).close()
|
|
e2.execute(select([6])).close()
|
|
|
|
assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
|
|
assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
|
|
assert len(self.buf.buffer) == 4
|
|
|
|
class MockStrategyTest(fixtures.TestBase):
|
|
def _engine_fixture(self):
|
|
buf = StringIO.StringIO()
|
|
def dump(sql, *multiparams, **params):
|
|
buf.write(unicode(sql.compile(dialect=engine.dialect)))
|
|
engine = create_engine('postgresql://', strategy='mock', executor=dump)
|
|
return engine, buf
|
|
|
|
def test_sequence_not_duped(self):
|
|
engine, buf = self._engine_fixture()
|
|
metadata = MetaData()
|
|
t = Table('testtable', metadata,
|
|
Column('pk', Integer, Sequence('testtable_pk_seq'), primary_key=True)
|
|
)
|
|
|
|
t.create(engine)
|
|
t.drop(engine)
|
|
|
|
eq_(
|
|
re.findall(r'CREATE (\w+)', buf.getvalue()),
|
|
["SEQUENCE", "TABLE"]
|
|
)
|
|
|
|
eq_(
|
|
re.findall(r'DROP (\w+)', buf.getvalue()),
|
|
["SEQUENCE", "TABLE"]
|
|
)
|
|
|
|
class ResultProxyTest(fixtures.TestBase):
|
|
|
|
def test_nontuple_row(self):
|
|
"""ensure the C version of BaseRowProxy handles
|
|
duck-type-dependent rows."""
|
|
|
|
from sqlalchemy.engine import RowProxy
|
|
|
|
class MyList(object):
|
|
def __init__(self, l):
|
|
self.l = l
|
|
|
|
def __len__(self):
|
|
return len(self.l)
|
|
|
|
def __getitem__(self, i):
|
|
return list.__getitem__(self.l, i)
|
|
|
|
proxy = RowProxy(object(), MyList(['value']), [None], {'key'
|
|
: (None, None, 0), 0: (None, None, 0)})
|
|
eq_(list(proxy), ['value'])
|
|
eq_(proxy[0], 'value')
|
|
eq_(proxy['key'], 'value')
|
|
|
|
@testing.provide_metadata
|
|
def test_no_rowcount_on_selects_inserts(self):
|
|
"""assert that rowcount is only called on deletes and updates.
|
|
|
|
This because cursor.rowcount can be expensive on some dialects
|
|
such as Firebird.
|
|
|
|
"""
|
|
|
|
metadata = self.metadata
|
|
|
|
engine = engines.testing_engine()
|
|
metadata.bind = engine
|
|
|
|
t = Table('t1', metadata,
|
|
Column('data', String(10))
|
|
)
|
|
metadata.create_all()
|
|
|
|
class BreakRowcountMixin(object):
|
|
@property
|
|
def rowcount(self):
|
|
assert False
|
|
|
|
execution_ctx_cls = engine.dialect.execution_ctx_cls
|
|
engine.dialect.execution_ctx_cls = type("FakeCtx",
|
|
(BreakRowcountMixin,
|
|
execution_ctx_cls),
|
|
{})
|
|
|
|
try:
|
|
r = t.insert().execute({'data': 'd1'}, {'data': 'd2'},
|
|
{'data': 'd3'})
|
|
eq_(t.select().execute().fetchall(), [('d1', ), ('d2', ),
|
|
('d3', )])
|
|
assert_raises(AssertionError, t.update().execute, {'data'
|
|
: 'd4'})
|
|
assert_raises(AssertionError, t.delete().execute)
|
|
finally:
|
|
engine.dialect.execution_ctx_cls = execution_ctx_cls
|
|
|
|
|
|
@testing.requires.python26
|
|
def test_rowproxy_is_sequence(self):
|
|
import collections
|
|
from sqlalchemy.engine import RowProxy
|
|
|
|
row = RowProxy(object(), ['value'], [None], {'key'
|
|
: (None, None, 0), 0: (None, None, 0)})
|
|
assert isinstance(row, collections.Sequence)
|
|
|
|
@testing.requires.cextensions
|
|
def test_row_c_sequence_check(self):
|
|
import csv
|
|
import collections
|
|
from StringIO import StringIO
|
|
|
|
metadata = MetaData()
|
|
metadata.bind = 'sqlite://'
|
|
users = Table('users', metadata,
|
|
Column('id', Integer, primary_key=True),
|
|
Column('name', String(40)),
|
|
)
|
|
users.create()
|
|
|
|
users.insert().execute(name='Test')
|
|
row = users.select().execute().fetchone()
|
|
|
|
s = StringIO()
|
|
writer = csv.writer(s)
|
|
# csv performs PySequenceCheck call
|
|
writer.writerow(row)
|
|
assert s.getvalue().strip() == '1,Test'
|
|
|
|
class AlternateResultProxyTest(fixtures.TestBase):
|
|
__requires__ = ('sqlite', )
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
from sqlalchemy.engine import base, default
|
|
cls.engine = engine = testing_engine('sqlite://')
|
|
m = MetaData()
|
|
cls.table = t = Table('test', m,
|
|
Column('x', Integer, primary_key=True),
|
|
Column('y', String(50, convert_unicode='force'))
|
|
)
|
|
m.create_all(engine)
|
|
engine.execute(t.insert(), [
|
|
{'x':i, 'y':"t_%d" % i} for i in xrange(1, 12)
|
|
])
|
|
|
|
def _test_proxy(self, cls):
|
|
class ExcCtx(default.DefaultExecutionContext):
|
|
def get_result_proxy(self):
|
|
return cls(self)
|
|
self.engine.dialect.execution_ctx_cls = ExcCtx
|
|
rows = []
|
|
r = self.engine.execute(select([self.table]))
|
|
assert isinstance(r, cls)
|
|
for i in range(5):
|
|
rows.append(r.fetchone())
|
|
eq_(rows, [(i, "t_%d" % i) for i in xrange(1, 6)])
|
|
|
|
rows = r.fetchmany(3)
|
|
eq_(rows, [(i, "t_%d" % i) for i in xrange(6, 9)])
|
|
|
|
rows = r.fetchall()
|
|
eq_(rows, [(i, "t_%d" % i) for i in xrange(9, 12)])
|
|
|
|
r = self.engine.execute(select([self.table]))
|
|
rows = r.fetchmany(None)
|
|
eq_(rows[0], (1, "t_1"))
|
|
# number of rows here could be one, or the whole thing
|
|
assert len(rows) == 1 or len(rows) == 11
|
|
|
|
r = self.engine.execute(select([self.table]).limit(1))
|
|
r.fetchone()
|
|
eq_(r.fetchone(), None)
|
|
|
|
r = self.engine.execute(select([self.table]).limit(5))
|
|
rows = r.fetchmany(6)
|
|
eq_(rows, [(i, "t_%d" % i) for i in xrange(1, 6)])
|
|
|
|
def test_plain(self):
|
|
self._test_proxy(base.ResultProxy)
|
|
|
|
def test_buffered_row_result_proxy(self):
|
|
self._test_proxy(base.BufferedRowResultProxy)
|
|
|
|
def test_fully_buffered_result_proxy(self):
|
|
self._test_proxy(base.FullyBufferedResultProxy)
|
|
|
|
def test_buffered_column_result_proxy(self):
|
|
self._test_proxy(base.BufferedColumnResultProxy)
|
|
|
|
class EngineEventsTest(fixtures.TestBase):
|
|
__requires__ = 'ad_hoc_engines',
|
|
|
|
def tearDown(self):
|
|
Engine.dispatch._clear()
|
|
|
|
def _assert_stmts(self, expected, received):
|
|
for stmt, params, posn in expected:
|
|
if not received:
|
|
assert False
|
|
while received:
|
|
teststmt, testparams, testmultiparams = \
|
|
received.pop(0)
|
|
teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
|
|
teststmt).strip()
|
|
if teststmt.startswith(stmt) and (testparams
|
|
== params or testparams == posn):
|
|
break
|
|
|
|
def test_per_engine_independence(self):
|
|
e1 = testing_engine(config.db_url)
|
|
e2 = testing_engine(config.db_url)
|
|
|
|
canary = []
|
|
def before_exec(conn, stmt, *arg):
|
|
canary.append(stmt)
|
|
event.listen(e1, "before_execute", before_exec)
|
|
s1 = select([1])
|
|
s2 = select([2])
|
|
e1.execute(s1)
|
|
e2.execute(s2)
|
|
eq_(canary, [s1])
|
|
event.listen(e2, "before_execute", before_exec)
|
|
e1.execute(s1)
|
|
e2.execute(s2)
|
|
eq_(canary, [s1, s1, s2])
|
|
|
|
def test_per_engine_plus_global(self):
|
|
canary = []
|
|
def be1(conn, stmt, *arg):
|
|
canary.append('be1')
|
|
def be2(conn, stmt, *arg):
|
|
canary.append('be2')
|
|
def be3(conn, stmt, *arg):
|
|
canary.append('be3')
|
|
|
|
event.listen(Engine, "before_execute", be1)
|
|
e1 = testing_engine(config.db_url)
|
|
e2 = testing_engine(config.db_url)
|
|
|
|
event.listen(e1, "before_execute", be2)
|
|
|
|
event.listen(Engine, "before_execute", be3)
|
|
e1.connect()
|
|
e2.connect()
|
|
canary[:] = []
|
|
e1.execute(select([1]))
|
|
e2.execute(select([1]))
|
|
|
|
eq_(canary, ['be1', 'be3', 'be2', 'be1', 'be3'])
|
|
|
|
def test_argument_format_execute(self):
|
|
def before_execute(conn, clauseelement, multiparams, params):
|
|
assert isinstance(multiparams, (list, tuple))
|
|
assert isinstance(params, dict)
|
|
def after_execute(conn, clauseelement, multiparams, params, result):
|
|
assert isinstance(multiparams, (list, tuple))
|
|
assert isinstance(params, dict)
|
|
e1 = testing_engine(config.db_url)
|
|
event.listen(e1, 'before_execute', before_execute)
|
|
event.listen(e1, 'after_execute', after_execute)
|
|
|
|
e1.execute(select([1]))
|
|
e1.execute(select([1]).compile(dialect=e1.dialect).statement)
|
|
e1.execute(select([1]).compile(dialect=e1.dialect))
|
|
e1._execute_compiled(select([1]).compile(dialect=e1.dialect), [], {})
|
|
|
|
def test_exception_event(self):
|
|
engine = engines.testing_engine()
|
|
canary = []
|
|
|
|
@event.listens_for(engine, 'dbapi_error')
|
|
def err(conn, cursor, stmt, parameters, context, exception):
|
|
canary.append((stmt, parameters, exception))
|
|
|
|
conn = engine.connect()
|
|
try:
|
|
conn.execute("SELECT FOO FROM I_DONT_EXIST")
|
|
assert False
|
|
except tsa.exc.DBAPIError, e:
|
|
assert canary[0][2] is e.orig
|
|
assert canary[0][0] == "SELECT FOO FROM I_DONT_EXIST"
|
|
|
|
|
|
@testing.fails_on('firebird', 'Data type unknown')
|
|
def test_execute_events(self):
|
|
|
|
stmts = []
|
|
cursor_stmts = []
|
|
|
|
def execute(conn, clauseelement, multiparams,
|
|
params ):
|
|
stmts.append((str(clauseelement), params, multiparams))
|
|
|
|
def cursor_execute(conn, cursor, statement, parameters,
|
|
context, executemany):
|
|
cursor_stmts.append((str(statement), parameters, None))
|
|
|
|
|
|
for engine in [
|
|
engines.testing_engine(options=dict(implicit_returning=False)),
|
|
engines.testing_engine(options=dict(implicit_returning=False,
|
|
strategy='threadlocal'))
|
|
]:
|
|
event.listen(engine, 'before_execute', execute)
|
|
event.listen(engine, 'before_cursor_execute', cursor_execute)
|
|
|
|
m = MetaData(engine)
|
|
t1 = Table('t1', m,
|
|
Column('c1', Integer, primary_key=True),
|
|
Column('c2', String(50), default=func.lower('Foo'),
|
|
primary_key=True)
|
|
)
|
|
m.create_all()
|
|
try:
|
|
t1.insert().execute(c1=5, c2='some data')
|
|
t1.insert().execute(c1=6)
|
|
eq_(engine.execute('select * from t1').fetchall(), [(5,
|
|
'some data'), (6, 'foo')])
|
|
finally:
|
|
m.drop_all()
|
|
engine.dispose()
|
|
compiled = [('CREATE TABLE t1', {}, None),
|
|
('INSERT INTO t1 (c1, c2)', {'c2': 'some data',
|
|
'c1': 5}, None), ('INSERT INTO t1 (c1, c2)',
|
|
{'c1': 6}, None), ('select * from t1', {},
|
|
None), ('DROP TABLE t1', {}, None)]
|
|
if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr
|
|
# eexecute_pk_sequence
|
|
# s:
|
|
cursor = [
|
|
('CREATE TABLE t1', {}, ()),
|
|
('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1'
|
|
: 5}, (5, 'some data')),
|
|
('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )),
|
|
('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6},
|
|
(6, 'foo')),
|
|
('select * from t1', {}, ()),
|
|
('DROP TABLE t1', {}, ()),
|
|
]
|
|
else:
|
|
insert2_params = 6, 'Foo'
|
|
if testing.against('oracle+zxjdbc'):
|
|
insert2_params += (ReturningParam(12), )
|
|
cursor = [('CREATE TABLE t1', {}, ()),
|
|
('INSERT INTO t1 (c1, c2)', {'c2': 'some data'
|
|
, 'c1': 5}, (5, 'some data')),
|
|
('INSERT INTO t1 (c1, c2)', {'c1': 6,
|
|
'lower_2': 'Foo'}, insert2_params),
|
|
('select * from t1', {}, ()), ('DROP TABLE t1'
|
|
, {}, ())] # bind param name 'lower_2' might
|
|
# be incorrect
|
|
self._assert_stmts(compiled, stmts)
|
|
self._assert_stmts(cursor, cursor_stmts)
|
|
|
|
def test_options(self):
|
|
canary = []
|
|
def execute(conn, *args, **kw):
|
|
canary.append('execute')
|
|
|
|
def cursor_execute(conn, *args, **kw):
|
|
canary.append('cursor_execute')
|
|
|
|
engine = engines.testing_engine()
|
|
event.listen(engine, 'before_execute', execute)
|
|
event.listen(engine, 'before_cursor_execute', cursor_execute)
|
|
conn = engine.connect()
|
|
c2 = conn.execution_options(foo='bar')
|
|
eq_(c2._execution_options, {'foo':'bar'})
|
|
c2.execute(select([1]))
|
|
c3 = c2.execution_options(bar='bat')
|
|
eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'})
|
|
eq_(canary, ['execute', 'cursor_execute'])
|
|
|
|
def test_retval_flag(self):
|
|
canary = []
|
|
def tracker(name):
|
|
def go(conn, *args, **kw):
|
|
canary.append(name)
|
|
return go
|
|
|
|
def execute(conn, clauseelement, multiparams, params):
|
|
canary.append('execute')
|
|
return clauseelement, multiparams, params
|
|
|
|
def cursor_execute(conn, cursor, statement,
|
|
parameters, context, executemany):
|
|
canary.append('cursor_execute')
|
|
return statement, parameters
|
|
|
|
engine = engines.testing_engine()
|
|
|
|
assert_raises(
|
|
tsa.exc.ArgumentError,
|
|
event.listen, engine, "begin", tracker("begin"), retval=True
|
|
)
|
|
|
|
event.listen(engine, "before_execute", execute, retval=True)
|
|
event.listen(engine, "before_cursor_execute", cursor_execute, retval=True)
|
|
engine.execute(select([1]))
|
|
eq_(
|
|
canary, ['execute', 'cursor_execute']
|
|
)
|
|
|
|
def test_transactional(self):
|
|
canary = []
|
|
def tracker(name):
|
|
def go(conn, *args, **kw):
|
|
canary.append(name)
|
|
return go
|
|
|
|
engine = engines.testing_engine()
|
|
event.listen(engine, 'before_execute', tracker('execute'))
|
|
event.listen(engine, 'before_cursor_execute', tracker('cursor_execute'))
|
|
event.listen(engine, 'begin', tracker('begin'))
|
|
event.listen(engine, 'commit', tracker('commit'))
|
|
event.listen(engine, 'rollback', tracker('rollback'))
|
|
|
|
conn = engine.connect()
|
|
trans = conn.begin()
|
|
conn.execute(select([1]))
|
|
trans.rollback()
|
|
trans = conn.begin()
|
|
conn.execute(select([1]))
|
|
trans.commit()
|
|
|
|
eq_(canary, [
|
|
'begin', 'execute', 'cursor_execute', 'rollback',
|
|
'begin', 'execute', 'cursor_execute', 'commit',
|
|
])
|
|
|
|
@testing.requires.savepoints
|
|
@testing.requires.two_phase_transactions
|
|
def test_transactional_advanced(self):
|
|
canary = []
|
|
def tracker(name):
|
|
def go(*args, **kw):
|
|
canary.append(name)
|
|
return go
|
|
|
|
engine = engines.testing_engine()
|
|
for name in ['begin', 'savepoint',
|
|
'rollback_savepoint', 'release_savepoint',
|
|
'rollback', 'begin_twophase',
|
|
'prepare_twophase', 'commit_twophase']:
|
|
event.listen(engine, '%s' % name, tracker(name))
|
|
|
|
conn = engine.connect()
|
|
|
|
trans = conn.begin()
|
|
trans2 = conn.begin_nested()
|
|
conn.execute(select([1]))
|
|
trans2.rollback()
|
|
trans2 = conn.begin_nested()
|
|
conn.execute(select([1]))
|
|
trans2.commit()
|
|
trans.rollback()
|
|
|
|
trans = conn.begin_twophase()
|
|
conn.execute(select([1]))
|
|
trans.prepare()
|
|
trans.commit()
|
|
|
|
eq_(canary, ['begin', 'savepoint',
|
|
'rollback_savepoint', 'savepoint', 'release_savepoint',
|
|
'rollback', 'begin_twophase',
|
|
'prepare_twophase', 'commit_twophase']
|
|
)
|
|
|
|
|
|
class ProxyConnectionTest(fixtures.TestBase):
|
|
"""These are the same tests as EngineEventsTest, except using
|
|
the deprecated ConnectionProxy interface.
|
|
|
|
"""
|
|
__requires__ = 'ad_hoc_engines',
|
|
|
|
@testing.uses_deprecated(r'.*Use event.listen')
|
|
@testing.fails_on('firebird', 'Data type unknown')
|
|
def test_proxy(self):
|
|
|
|
stmts = []
|
|
cursor_stmts = []
|
|
|
|
class MyProxy(ConnectionProxy):
|
|
def execute(
|
|
self,
|
|
conn,
|
|
execute,
|
|
clauseelement,
|
|
*multiparams,
|
|
**params
|
|
):
|
|
stmts.append((str(clauseelement), params, multiparams))
|
|
return execute(clauseelement, *multiparams, **params)
|
|
|
|
def cursor_execute(
|
|
self,
|
|
execute,
|
|
cursor,
|
|
statement,
|
|
parameters,
|
|
context,
|
|
executemany,
|
|
):
|
|
cursor_stmts.append((str(statement), parameters, None))
|
|
return execute(cursor, statement, parameters, context)
|
|
|
|
def assert_stmts(expected, received):
|
|
for stmt, params, posn in expected:
|
|
if not received:
|
|
assert False
|
|
while received:
|
|
teststmt, testparams, testmultiparams = \
|
|
received.pop(0)
|
|
teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
|
|
teststmt).strip()
|
|
if teststmt.startswith(stmt) and (testparams
|
|
== params or testparams == posn):
|
|
break
|
|
|
|
for engine in \
|
|
engines.testing_engine(options=dict(implicit_returning=False,
|
|
proxy=MyProxy())), \
|
|
engines.testing_engine(options=dict(implicit_returning=False,
|
|
proxy=MyProxy(),
|
|
strategy='threadlocal')):
|
|
m = MetaData(engine)
|
|
t1 = Table('t1', m,
|
|
Column('c1', Integer, primary_key=True),
|
|
Column('c2', String(50), default=func.lower('Foo'),
|
|
primary_key=True)
|
|
)
|
|
m.create_all()
|
|
try:
|
|
t1.insert().execute(c1=5, c2='some data')
|
|
t1.insert().execute(c1=6)
|
|
eq_(engine.execute('select * from t1').fetchall(), [(5,
|
|
'some data'), (6, 'foo')])
|
|
finally:
|
|
m.drop_all()
|
|
engine.dispose()
|
|
compiled = [('CREATE TABLE t1', {}, None),
|
|
('INSERT INTO t1 (c1, c2)', {'c2': 'some data',
|
|
'c1': 5}, None), ('INSERT INTO t1 (c1, c2)',
|
|
{'c1': 6}, None), ('select * from t1', {},
|
|
None), ('DROP TABLE t1', {}, None)]
|
|
if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr
|
|
# eexecute_pk_sequence
|
|
# s:
|
|
cursor = [
|
|
('CREATE TABLE t1', {}, ()),
|
|
('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1'
|
|
: 5}, (5, 'some data')),
|
|
('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )),
|
|
('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6},
|
|
(6, 'foo')),
|
|
('select * from t1', {}, ()),
|
|
('DROP TABLE t1', {}, ()),
|
|
]
|
|
else:
|
|
insert2_params = 6, 'Foo'
|
|
if testing.against('oracle+zxjdbc'):
|
|
insert2_params += (ReturningParam(12), )
|
|
cursor = [('CREATE TABLE t1', {}, ()),
|
|
('INSERT INTO t1 (c1, c2)', {'c2': 'some data'
|
|
, 'c1': 5}, (5, 'some data')),
|
|
('INSERT INTO t1 (c1, c2)', {'c1': 6,
|
|
'lower_2': 'Foo'}, insert2_params),
|
|
('select * from t1', {}, ()), ('DROP TABLE t1'
|
|
, {}, ())] # bind param name 'lower_2' might
|
|
# be incorrect
|
|
assert_stmts(compiled, stmts)
|
|
assert_stmts(cursor, cursor_stmts)
|
|
|
|
@testing.uses_deprecated(r'.*Use event.listen')
|
|
def test_options(self):
|
|
canary = []
|
|
class TrackProxy(ConnectionProxy):
|
|
def __getattribute__(self, key):
|
|
fn = object.__getattribute__(self, key)
|
|
def go(*arg, **kw):
|
|
canary.append(fn.__name__)
|
|
return fn(*arg, **kw)
|
|
return go
|
|
engine = engines.testing_engine(options={'proxy':TrackProxy()})
|
|
conn = engine.connect()
|
|
c2 = conn.execution_options(foo='bar')
|
|
eq_(c2._execution_options, {'foo':'bar'})
|
|
c2.execute(select([1]))
|
|
c3 = c2.execution_options(bar='bat')
|
|
eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'})
|
|
eq_(canary, ['execute', 'cursor_execute'])
|
|
|
|
|
|
@testing.uses_deprecated(r'.*Use event.listen')
|
|
def test_transactional(self):
|
|
canary = []
|
|
class TrackProxy(ConnectionProxy):
|
|
def __getattribute__(self, key):
|
|
fn = object.__getattribute__(self, key)
|
|
def go(*arg, **kw):
|
|
canary.append(fn.__name__)
|
|
return fn(*arg, **kw)
|
|
return go
|
|
|
|
engine = engines.testing_engine(options={'proxy':TrackProxy()})
|
|
conn = engine.connect()
|
|
trans = conn.begin()
|
|
conn.execute(select([1]))
|
|
trans.rollback()
|
|
trans = conn.begin()
|
|
conn.execute(select([1]))
|
|
trans.commit()
|
|
|
|
eq_(canary, [
|
|
'begin', 'execute', 'cursor_execute', 'rollback',
|
|
'begin', 'execute', 'cursor_execute', 'commit',
|
|
])
|
|
|
|
@testing.uses_deprecated(r'.*Use event.listen')
|
|
@testing.requires.savepoints
|
|
@testing.requires.two_phase_transactions
|
|
def test_transactional_advanced(self):
|
|
canary = []
|
|
class TrackProxy(ConnectionProxy):
|
|
def __getattribute__(self, key):
|
|
fn = object.__getattribute__(self, key)
|
|
def go(*arg, **kw):
|
|
canary.append(fn.__name__)
|
|
return fn(*arg, **kw)
|
|
return go
|
|
|
|
engine = engines.testing_engine(options={'proxy':TrackProxy()})
|
|
conn = engine.connect()
|
|
|
|
trans = conn.begin()
|
|
trans2 = conn.begin_nested()
|
|
conn.execute(select([1]))
|
|
trans2.rollback()
|
|
trans2 = conn.begin_nested()
|
|
conn.execute(select([1]))
|
|
trans2.commit()
|
|
trans.rollback()
|
|
|
|
trans = conn.begin_twophase()
|
|
conn.execute(select([1]))
|
|
trans.prepare()
|
|
trans.commit()
|
|
|
|
canary = [t for t in canary if t not in ('cursor_execute', 'execute')]
|
|
eq_(canary, ['begin', 'savepoint',
|
|
'rollback_savepoint', 'savepoint', 'release_savepoint',
|
|
'rollback', 'begin_twophase',
|
|
'prepare_twophase', 'commit_twophase']
|
|
)
|
|
|