Files
sqlalchemy/test/engine/test_logging.py
T
Mike Bayer 4ccf8b7214 more autocommit messaging
Further clarified connection-level logging to indicate the BEGIN, ROLLBACK
and COMMIT log messages do not actually indicate a real transaction when
the AUTOCOMMIT isolation level is in use; messaging has been extended to
include the BEGIN message itself, and the messaging has also been fixed to
accommodate when the :class:`.Engine` level
:paramref:`.create_engine.isolation_level` parameter was used directly.

Fixes: #7853
Change-Id: Iafc78070737ad117f84262e4bde84b81a81e4ea1
(cherry picked from commit 5636692467)
2022-03-24 14:03:08 -04:00

971 lines
32 KiB
Python

import logging.handlers
import re
import sqlalchemy as tsa
from sqlalchemy import bindparam
from sqlalchemy import Column
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy import util
from sqlalchemy.sql import util as sql_util
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import eq_regex
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
from sqlalchemy.testing.util import lazy_gc
def exec_sql(engine, sql, *args, **kwargs):
with engine.begin() as conn:
return conn.exec_driver_sql(sql, *args, **kwargs)
class LogParamsTest(fixtures.TestBase):
__only_on__ = "sqlite"
__requires__ = ("ad_hoc_engines",)
def setup_test(self):
self.eng = engines.testing_engine(options={"echo": True})
self.no_param_engine = engines.testing_engine(
options={"echo": True, "hide_parameters": True}
)
exec_sql(self.eng, "create table if not exists foo (data string)")
exec_sql(
self.no_param_engine,
"create table if not exists foo (data string)",
)
self.buf = logging.handlers.BufferingHandler(100)
for log in [logging.getLogger("sqlalchemy.engine")]:
log.addHandler(self.buf)
def teardown_test(self):
exec_sql(self.eng, "drop table if exists foo")
for log in [logging.getLogger("sqlalchemy.engine")]:
log.removeHandler(self.buf)
def test_log_large_list_of_dict(self):
exec_sql(
self.eng,
"INSERT INTO foo (data) values (:data)",
[{"data": str(i)} for i in range(100)],
)
eq_(
self.buf.buffer[2].message,
"[raw sql] [{'data': '0'}, {'data': '1'}, {'data': '2'}, "
"{'data': '3'}, "
"{'data': '4'}, {'data': '5'}, {'data': '6'}, {'data': '7'}"
" ... displaying 10 of 100 total bound "
"parameter sets ... {'data': '98'}, {'data': '99'}]",
)
def test_repr_params_large_list_of_dict(self):
eq_(
repr(
sql_util._repr_params(
[{"data": str(i)} for i in range(100)],
batches=10,
ismulti=True,
)
),
"[{'data': '0'}, {'data': '1'}, {'data': '2'}, {'data': '3'}, "
"{'data': '4'}, {'data': '5'}, {'data': '6'}, {'data': '7'}"
" ... displaying 10 of 100 total bound "
"parameter sets ... {'data': '98'}, {'data': '99'}]",
)
def test_log_no_parameters(self):
exec_sql(
self.no_param_engine,
"INSERT INTO foo (data) values (:data)",
[{"data": str(i)} for i in range(100)],
)
eq_(
self.buf.buffer[2].message,
"[raw sql] [SQL parameters hidden due to hide_parameters=True]",
)
def test_log_large_list_of_tuple(self):
exec_sql(
self.eng,
"INSERT INTO foo (data) values (?)",
[(str(i),) for i in range(100)],
)
eq_(
self.buf.buffer[2].message,
"[raw sql] [('0',), ('1',), ('2',), ('3',), ('4',), ('5',), "
"('6',), ('7',) ... displaying 10 of 100 total "
"bound parameter sets ... ('98',), ('99',)]",
)
def test_log_positional_array(self):
with self.eng.connect() as conn:
exc_info = assert_raises(
tsa.exc.DBAPIError,
conn.execute,
tsa.text("SELECT * FROM foo WHERE id IN :foo AND bar=:bar"),
{"foo": [1, 2, 3], "bar": "hi"},
)
assert (
"[SQL: SELECT * FROM foo WHERE id IN ? AND bar=?]\n"
"[parameters: ([1, 2, 3], 'hi')]\n" in str(exc_info)
)
eq_regex(
self.buf.buffer[1].message,
r"\[generated .*\] \(\[1, 2, 3\], 'hi'\)",
)
def test_repr_params_positional_array(self):
eq_(
repr(
sql_util._repr_params(
[[1, 2, 3], 5], batches=10, ismulti=False
)
),
"[[1, 2, 3], 5]",
)
def test_repr_params_unknown_list(self):
# not known if given multiparams or not. repr params with
# straight truncation
eq_(
repr(
sql_util._repr_params(
[[i for i in range(300)], 5], batches=10, max_chars=80
)
),
"[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ... "
"(1315 characters truncated) ... , 293, 294, 295, 296, "
"297, 298, 299], 5]",
)
def test_repr_params_positional_list(self):
# given non-multi-params in a list. repr params with
# per-element truncation, mostly does the exact same thing
eq_(
repr(
sql_util._repr_params(
[[i for i in range(300)], 5],
batches=10,
max_chars=80,
ismulti=False,
)
),
"[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 1 ... "
"(1310 characters truncated) ... "
"292, 293, 294, 295, 296, 297, 298, 299], 5]",
)
def test_repr_params_named_dict(self):
# given non-multi-params in a list. repr params with
# per-element truncation, mostly does the exact same thing
params = {"key_%s" % i: i for i in range(10)}
eq_(
repr(
sql_util._repr_params(
params, batches=10, max_chars=80, ismulti=False
)
),
repr(params),
)
def test_repr_params_ismulti_named_dict(self):
# given non-multi-params in a list. repr params with
# per-element truncation, mostly does the exact same thing
param = {"key_%s" % i: i for i in range(10)}
eq_(
repr(
sql_util._repr_params(
[param for j in range(50)],
batches=5,
max_chars=80,
ismulti=True,
)
),
"[%(param)r, %(param)r, %(param)r ... "
"displaying 5 of 50 total bound parameter sets ... "
"%(param)r, %(param)r]" % {"param": param},
)
def test_repr_params_ismulti_list(self):
# given multi-params in a list. repr params with
# per-element truncation, mostly does the exact same thing
eq_(
repr(
sql_util._repr_params(
[
[[i for i in range(300)], 5],
[[i for i in range(300)], 5],
[[i for i in range(300)], 5],
],
batches=10,
max_chars=80,
ismulti=True,
)
),
"[[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 1 ... "
"(1310 characters truncated) ... 292, 293, 294, 295, 296, 297, "
"298, 299], 5], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 1 ... "
"(1310 characters truncated) ... 292, 293, 294, 295, 296, 297, "
"298, 299], 5], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 1 ... "
"(1310 characters truncated) ... 292, 293, 294, 295, 296, 297, "
"298, 299], 5]]",
)
def test_log_large_parameter_single(self):
import random
largeparam = "".join(chr(random.randint(52, 85)) for i in range(5000))
exec_sql(self.eng, "INSERT INTO foo (data) values (?)", (largeparam,))
eq_(
self.buf.buffer[2].message,
"[raw sql] ('%s ... (4702 characters truncated) ... %s',)"
% (largeparam[0:149], largeparam[-149:]),
)
def test_log_large_multi_parameter(self):
import random
lp1 = "".join(chr(random.randint(52, 85)) for i in range(5))
lp2 = "".join(chr(random.randint(52, 85)) for i in range(8))
lp3 = "".join(chr(random.randint(52, 85)) for i in range(670))
exec_sql(self.eng, "SELECT ?, ?, ?", (lp1, lp2, lp3))
eq_(
self.buf.buffer[2].message,
"[raw sql] ('%s', '%s', '%s ... (372 characters truncated) "
"... %s')" % (lp1, lp2, lp3[0:149], lp3[-149:]),
)
def test_log_large_parameter_multiple(self):
import random
lp1 = "".join(chr(random.randint(52, 85)) for i in range(5000))
lp2 = "".join(chr(random.randint(52, 85)) for i in range(200))
lp3 = "".join(chr(random.randint(52, 85)) for i in range(670))
exec_sql(
self.eng,
"INSERT INTO foo (data) values (?)",
[(lp1,), (lp2,), (lp3,)],
)
eq_(
self.buf.buffer[2].message,
"[raw sql] [('%s ... (4702 characters truncated) ... %s',), "
"('%s',), "
"('%s ... (372 characters truncated) ... %s',)]"
% (lp1[0:149], lp1[-149:], lp2, lp3[0:149], lp3[-149:]),
)
def test_exception_format_dict_param(self):
exception = tsa.exc.IntegrityError("foo", {"x": "y"}, None)
eq_regex(
str(exception),
r"\(.*.NoneType\) None\n\[SQL: foo\]\n\[parameters: {'x': 'y'}\]",
)
def test_exception_format_hide_parameters(self):
exception = tsa.exc.IntegrityError(
"foo", {"x": "y"}, None, hide_parameters=True
)
eq_regex(
str(exception),
r"\(.*.NoneType\) None\n\[SQL: foo\]\n"
r"\[SQL parameters hidden due to hide_parameters=True\]",
)
def test_exception_format_hide_parameters_dbapi_round_trip(self):
assert_raises_message(
tsa.exc.DBAPIError,
r".*INSERT INTO nonexistent \(data\) values \(:data\)\]\n"
r"\[SQL parameters hidden due to hide_parameters=True\]",
lambda: exec_sql(
self.no_param_engine,
"INSERT INTO nonexistent (data) values (:data)",
[{"data": str(i)} for i in range(10)],
),
)
def test_exception_format_hide_parameters_nondbapi_round_trip(self):
foo = Table("foo", MetaData(), Column("data", String))
with self.no_param_engine.connect() as conn:
assert_raises_message(
tsa.exc.StatementError,
r"\(sqlalchemy.exc.InvalidRequestError\) A value is required "
r"for bind parameter 'the_data_2'\n"
r"\[SQL: SELECT foo.data \nFROM foo \nWHERE "
r"foo.data = \? OR foo.data = \?\]\n"
r"\[SQL parameters hidden due to hide_parameters=True\]",
conn.execute,
select(foo).where(
or_(
foo.c.data == bindparam("the_data_1"),
foo.c.data == bindparam("the_data_2"),
)
),
{"the_data_1": "some data"},
)
def test_exception_format_unexpected_parameter(self):
# test that if the parameters aren't any known type, we just
# run through repr()
exception = tsa.exc.IntegrityError("foo", "bar", "bat")
eq_regex(
str(exception),
r"\(.*.str\) bat\n\[SQL: foo\]\n\[parameters: 'bar'\]",
)
def test_exception_format_unexpected_member_parameter(self):
# test that if the parameters aren't any known type, we just
# run through repr()
exception = tsa.exc.IntegrityError("foo", ["bar", "bat"], "hoho")
eq_regex(
str(exception),
r"\(.*.str\) hoho\n\[SQL: foo\]\n\[parameters: \['bar', 'bat'\]\]",
)
def test_result_large_param(self):
import random
largeparam = "".join(chr(random.randint(52, 85)) for i in range(5000))
self.eng.echo = "debug"
result = exec_sql(self.eng, "SELECT ?", (largeparam,))
row = result.first()
eq_(
self.buf.buffer[2].message,
"[raw sql] ('%s ... (4702 characters truncated) ... %s',)"
% (largeparam[0:149], largeparam[-149:]),
)
if util.py3k:
eq_(
self.buf.buffer[5].message,
"Row ('%s ... (4702 characters truncated) ... %s',)"
% (largeparam[0:149], largeparam[-149:]),
)
else:
eq_(
self.buf.buffer[5].message,
"Row (u'%s ... (4703 characters truncated) ... %s',)"
% (largeparam[0:148], largeparam[-149:]),
)
if util.py3k:
eq_(
repr(row),
"('%s ... (4702 characters truncated) ... %s',)"
% (largeparam[0:149], largeparam[-149:]),
)
else:
eq_(
repr(row),
"(u'%s ... (4703 characters truncated) ... %s',)"
% (largeparam[0:148], largeparam[-149:]),
)
def test_error_large_dict(self):
assert_raises_message(
tsa.exc.DBAPIError,
r".*INSERT INTO nonexistent \(data\) values \(:data\)\]\n"
r"\[parameters: "
r"\[{'data': '0'}, {'data': '1'}, {'data': '2'}, "
r"{'data': '3'}, {'data': '4'}, {'data': '5'}, "
r"{'data': '6'}, {'data': '7'} ... displaying 10 of "
r"100 total bound parameter sets ... {'data': '98'}, "
r"{'data': '99'}\]",
lambda: exec_sql(
self.eng,
"INSERT INTO nonexistent (data) values (:data)",
[{"data": str(i)} for i in range(100)],
),
)
def test_error_large_list(self):
assert_raises_message(
tsa.exc.DBAPIError,
r".*INSERT INTO nonexistent \(data\) values "
r"\(\?\)\]\n\[parameters: \[\('0',\), \('1',\), \('2',\), "
r"\('3',\), \('4',\), \('5',\), \('6',\), \('7',\) "
r"... displaying "
r"10 of 100 total bound parameter sets ... "
r"\('98',\), \('99',\)\]",
lambda: exec_sql(
self.eng,
"INSERT INTO nonexistent (data) values (?)",
[(str(i),) for i in range(100)],
),
)
class PoolLoggingTest(fixtures.TestBase):
def setup_test(self):
self.existing_level = logging.getLogger("sqlalchemy.pool").level
self.buf = logging.handlers.BufferingHandler(100)
for log in [logging.getLogger("sqlalchemy.pool")]:
log.addHandler(self.buf)
def teardown_test(self):
for log in [logging.getLogger("sqlalchemy.pool")]:
log.removeHandler(self.buf)
logging.getLogger("sqlalchemy.pool").setLevel(self.existing_level)
def _queuepool_echo_fixture(self):
return tsa.pool.QueuePool(creator=mock.Mock(), echo="debug")
def _queuepool_logging_fixture(self):
logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG)
return tsa.pool.QueuePool(creator=mock.Mock())
def _stpool_echo_fixture(self):
return tsa.pool.SingletonThreadPool(creator=mock.Mock(), echo="debug")
def _stpool_logging_fixture(self):
logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG)
return tsa.pool.SingletonThreadPool(creator=mock.Mock())
def _test_queuepool(self, q, dispose=True):
conn = q.connect()
conn.close()
conn = None
conn = q.connect()
conn.close()
conn = None
conn = q.connect()
conn = None
del conn
lazy_gc()
q.dispose()
eq_(
[buf.msg for buf in self.buf.buffer],
[
"Created new connection %r",
"Connection %r checked out from pool",
"Connection %r being returned to pool%s",
"Connection %s rollback-on-return",
"Connection %r checked out from pool",
"Connection %r being returned to pool%s",
"Connection %s rollback-on-return",
"Connection %r checked out from pool",
"Connection %r being returned to pool%s",
"Connection %s rollback-on-return",
"Closing connection %r",
]
+ (["Pool disposed. %s"] if dispose else []),
)
def test_stpool_echo(self):
q = self._stpool_echo_fixture()
self._test_queuepool(q, False)
def test_stpool_logging(self):
q = self._stpool_logging_fixture()
self._test_queuepool(q, False)
@testing.requires.predictable_gc
def test_queuepool_echo(self):
q = self._queuepool_echo_fixture()
self._test_queuepool(q)
@testing.requires.predictable_gc
def test_queuepool_logging(self):
q = self._queuepool_logging_fixture()
self._test_queuepool(q)
class LoggingNameTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
def _assert_names_in_execute(self, eng, eng_name, pool_name):
with eng.connect() as conn:
conn.execute(select(1))
assert self.buf.buffer
for name in [b.name for b in self.buf.buffer]:
assert name in (
"sqlalchemy.engine.Engine.%s" % eng_name,
"sqlalchemy.pool.impl.%s.%s"
% (eng.pool.__class__.__name__, pool_name),
)
def _assert_no_name_in_execute(self, eng):
with eng.connect() as conn:
conn.execute(select(1))
assert self.buf.buffer
for name in [b.name for b in self.buf.buffer]:
assert name in (
"sqlalchemy.engine.Engine",
"sqlalchemy.pool.impl.%s" % eng.pool.__class__.__name__,
)
def _named_engine(self, **kw):
options = {
"logging_name": "myenginename",
"pool_logging_name": "mypoolname",
"echo": True,
}
options.update(kw)
return engines.testing_engine(options=options)
def _unnamed_engine(self, **kw):
kw.update({"echo": True})
return engines.testing_engine(options=kw)
def setup_test(self):
self.buf = logging.handlers.BufferingHandler(100)
for log in [
logging.getLogger("sqlalchemy.engine"),
logging.getLogger("sqlalchemy.pool"),
]:
log.addHandler(self.buf)
def teardown_test(self):
for log in [
logging.getLogger("sqlalchemy.engine"),
logging.getLogger("sqlalchemy.pool"),
]:
log.removeHandler(self.buf)
def test_named_logger_names(self):
eng = self._named_engine()
eq_(eng.logging_name, "myenginename")
eq_(eng.pool.logging_name, "mypoolname")
def test_named_logger_names_after_dispose(self):
eng = self._named_engine()
with eng.connect() as conn:
conn.execute(select(1))
eng.dispose()
eq_(eng.logging_name, "myenginename")
eq_(eng.pool.logging_name, "mypoolname")
def test_unnamed_logger_names(self):
eng = self._unnamed_engine()
eq_(eng.logging_name, None)
eq_(eng.pool.logging_name, None)
def test_named_logger_execute(self):
eng = self._named_engine()
self._assert_names_in_execute(eng, "myenginename", "mypoolname")
def test_named_logger_echoflags_execute(self):
eng = self._named_engine(echo="debug", echo_pool="debug")
self._assert_names_in_execute(eng, "myenginename", "mypoolname")
def test_named_logger_execute_after_dispose(self):
eng = self._named_engine()
with eng.connect() as conn:
conn.execute(select(1))
eng.dispose()
self._assert_names_in_execute(eng, "myenginename", "mypoolname")
def test_unnamed_logger_execute(self):
eng = self._unnamed_engine()
self._assert_no_name_in_execute(eng)
def test_unnamed_logger_echoflags_execute(self):
eng = self._unnamed_engine(echo="debug", echo_pool="debug")
self._assert_no_name_in_execute(eng)
class TransactionContextLoggingTest(fixtures.TestBase):
__only_on__ = "sqlite"
@testing.fixture()
def plain_assert_buf(self, plain_logging_engine):
buf = logging.handlers.BufferingHandler(100)
for log in [
logging.getLogger("sqlalchemy.engine"),
]:
log.addHandler(buf)
def go(expected):
assert buf.buffer
buflines = [rec.msg % rec.args for rec in buf.buffer]
eq_(buflines, expected)
buf.flush()
yield go
for log in [
logging.getLogger("sqlalchemy.engine"),
]:
log.removeHandler(buf)
@testing.fixture()
def assert_buf(self, logging_engine):
buf = logging.handlers.BufferingHandler(100)
for log in [
logging.getLogger("sqlalchemy.engine"),
]:
log.addHandler(buf)
def go(expected):
assert buf.buffer
buflines = [rec.msg % rec.args for rec in buf.buffer]
eq_(buflines, expected)
buf.flush()
yield go
for log in [
logging.getLogger("sqlalchemy.engine"),
]:
log.removeHandler(buf)
@testing.fixture()
def logging_engine(self, testing_engine):
kw = {"echo": True, "future": True}
e = testing_engine(options=kw)
e.connect().close()
return e
@testing.fixture()
def autocommit_iso_logging_engine(self, testing_engine):
kw = {"echo": True, "future": True, "isolation_level": "AUTOCOMMIT"}
e = testing_engine(options=kw)
e.connect().close()
return e
@testing.fixture()
def plain_logging_engine(self, testing_engine):
# deliver an engine with logging using the plain logging API,
# not the echo parameter
log = logging.getLogger("sqlalchemy.engine")
existing_level = log.level
log.setLevel(logging.DEBUG)
try:
e = testing_engine(future=True)
e.connect().close()
yield e
finally:
log.setLevel(existing_level)
def test_begin_once_block(self, logging_engine, assert_buf):
with logging_engine.begin():
pass
assert_buf(["BEGIN (implicit)", "COMMIT"])
def test_commit_as_you_go_block_commit(self, logging_engine, assert_buf):
with logging_engine.connect() as conn:
conn.begin()
conn.commit()
assert_buf(["BEGIN (implicit)", "COMMIT"])
def test_commit_as_you_go_block_rollback(self, logging_engine, assert_buf):
with logging_engine.connect() as conn:
conn.begin()
conn.rollback()
assert_buf(["BEGIN (implicit)", "ROLLBACK"])
def test_commit_as_you_go_block_commit_engine_level_autocommit(
self, autocommit_iso_logging_engine, assert_buf
):
with autocommit_iso_logging_engine.connect() as conn:
conn.begin()
conn.commit()
assert_buf(
[
"BEGIN (implicit; DBAPI should not "
"BEGIN due to autocommit mode)",
"COMMIT using DBAPI connection.commit(), DBAPI "
"should ignore due to autocommit mode",
]
)
def test_commit_engine_level_autocommit_exec_opt_nonauto(
self, autocommit_iso_logging_engine, assert_buf
):
with autocommit_iso_logging_engine.execution_options(
isolation_level=testing.db.dialect.default_isolation_level
).connect() as conn:
conn.begin()
conn.commit()
assert_buf(
[
"BEGIN (implicit)",
"COMMIT",
]
)
def test_commit_as_you_go_block_commit_autocommit(
self, logging_engine, assert_buf
):
with logging_engine.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as conn:
conn.begin()
conn.commit()
assert_buf(
[
"BEGIN (implicit; DBAPI should not "
"BEGIN due to autocommit mode)",
"COMMIT using DBAPI connection.commit(), DBAPI "
"should ignore due to autocommit mode",
]
)
def test_commit_as_you_go_block_rollback_autocommit(
self, logging_engine, assert_buf
):
with logging_engine.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as conn:
conn.begin()
conn.rollback()
assert_buf(
[
"BEGIN (implicit; DBAPI should not "
"BEGIN due to autocommit mode)",
"ROLLBACK using DBAPI connection.rollback(), DBAPI "
"should ignore due to autocommit mode",
]
)
def test_logging_compatibility(
self, plain_assert_buf, plain_logging_engine
):
"""ensure plain logging doesn't produce API errors.
Added as part of #7612
"""
e = plain_logging_engine
with e.connect() as conn:
result = conn.exec_driver_sql("select 1")
result.all()
plain_assert_buf(
[
"BEGIN (implicit)",
"select 1",
"[raw sql] ()",
"Col ('1',)",
"Row (1,)",
"ROLLBACK",
]
)
@testing.requires.python38
def test_log_messages_have_correct_metadata_plain(
self, plain_logging_engine
):
"""test #7612"""
self._test_log_messages_have_correct_metadata(plain_logging_engine)
@testing.requires.python38
def test_log_messages_have_correct_metadata_echo(self, logging_engine):
"""test #7612"""
self._test_log_messages_have_correct_metadata(logging_engine)
def _test_log_messages_have_correct_metadata(self, logging_engine):
buf = logging.handlers.BufferingHandler(100)
log = logging.getLogger("sqlalchemy.engine")
try:
log.addHandler(buf)
with logging_engine.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as conn:
conn.begin()
conn.rollback()
finally:
log.removeHandler(buf)
assert len(buf.buffer) >= 2
# log messages must originate from functions called 'begin'/'rollback'
logging_functions = {rec.funcName for rec in buf.buffer}
assert any(
"begin" in fn for fn in logging_functions
), logging_functions
assert any(
"rollback" in fn for fn in logging_functions
), logging_functions
# log messages must originate from different lines
log_lines = {rec.lineno for rec in buf.buffer}
assert len(log_lines) > 1, log_lines
buf.flush()
class LoggingTokenTest(fixtures.TestBase):
def setup_test(self):
self.buf = logging.handlers.BufferingHandler(100)
for log in [
logging.getLogger("sqlalchemy.engine"),
]:
log.addHandler(self.buf)
def teardown_test(self):
for log in [
logging.getLogger("sqlalchemy.engine"),
]:
log.removeHandler(self.buf)
def _assert_token_in_execute(self, conn, token):
self.buf.flush()
r = conn.execute(select(1))
r.all()
assert self.buf.buffer
for rec in self.buf.buffer:
line = rec.msg % rec.args
assert re.match(r"\[%s\]" % token, line)
self.buf.flush()
def _assert_no_tokens_in_execute(self, conn):
self.buf.flush()
r = conn.execute(select(1))
r.all()
assert self.buf.buffer
for rec in self.buf.buffer:
line = rec.msg % rec.args
assert not re.match(r"\[my_.*?\]", line)
self.buf.flush()
@testing.fixture()
def token_engine(self, testing_engine):
kw = {"echo": "debug"}
return testing_engine(options=kw)
def test_logging_token_option_connection(self, token_engine):
eng = token_engine
c1 = eng.connect().execution_options(logging_token="my_name_1")
c2 = eng.connect().execution_options(logging_token="my_name_2")
c3 = eng.connect()
self._assert_token_in_execute(c1, "my_name_1")
self._assert_token_in_execute(c2, "my_name_2")
self._assert_no_tokens_in_execute(c3)
c1.close()
c2.close()
c3.close()
def test_logging_token_option_engine(self, token_engine):
eng = token_engine
e1 = eng.execution_options(logging_token="my_name_1")
e2 = eng.execution_options(logging_token="my_name_2")
with e1.connect() as c1:
self._assert_token_in_execute(c1, "my_name_1")
with e2.connect() as c2:
self._assert_token_in_execute(c2, "my_name_2")
with eng.connect() as c3:
self._assert_no_tokens_in_execute(c3)
class EchoTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
def setup_test(self):
self.level = logging.getLogger("sqlalchemy.engine").level
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARN)
self.buf = logging.handlers.BufferingHandler(100)
logging.getLogger("sqlalchemy.engine").addHandler(self.buf)
def teardown_test(self):
logging.getLogger("sqlalchemy.engine").removeHandler(self.buf)
logging.getLogger("sqlalchemy.engine").setLevel(self.level)
def _testing_engine(self):
e = engines.testing_engine()
# do an initial execute to clear out 'first connect'
# messages
with e.connect() as conn:
conn.execute(select(10)).close()
self.buf.flush()
return e
def test_levels(self):
e1 = engines.testing_engine()
eq_(e1._should_log_info(), False)
eq_(e1._should_log_debug(), False)
eq_(e1.logger.isEnabledFor(logging.INFO), False)
eq_(e1.logger.getEffectiveLevel(), logging.WARN)
e1.echo = True
eq_(e1._should_log_info(), True)
eq_(e1._should_log_debug(), False)
eq_(e1.logger.isEnabledFor(logging.INFO), True)
eq_(e1.logger.getEffectiveLevel(), logging.INFO)
e1.echo = "debug"
eq_(e1._should_log_info(), True)
eq_(e1._should_log_debug(), True)
eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)
e1.echo = False
eq_(e1._should_log_info(), False)
eq_(e1._should_log_debug(), False)
eq_(e1.logger.isEnabledFor(logging.INFO), False)
eq_(e1.logger.getEffectiveLevel(), logging.WARN)
def test_echo_flag_independence(self):
"""test the echo flag's independence to a specific engine."""
e1 = self._testing_engine()
e2 = self._testing_engine()
e1.echo = True
with e1.connect() as conn:
conn.execute(select(1)).close()
with e2.connect() as conn:
conn.execute(select(2)).close()
e1.echo = False
with e1.connect() as conn:
conn.execute(select(3)).close()
with e2.connect() as conn:
conn.execute(select(4)).close()
e2.echo = True
with e1.connect() as conn:
conn.execute(select(5)).close()
with e2.connect() as conn:
conn.execute(select(6)).close()
assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
assert len(self.buf.buffer) == 4