Merge "do not return asyncio connections to the pool under gc" into main

This commit is contained in:
mike bayer
2023-02-06 23:52:23 +00:00
committed by Gerrit Code Review
5 changed files with 289 additions and 122 deletions
+40
View File
@@ -0,0 +1,40 @@
.. change::
:tags: bug, asyncio
:tickets: 9237
Repaired a regression caused by the fix for :ticket:`8419` which caused
asyncpg connections to be reset (i.e. transaction ``rollback()`` called)
and returned to the pool normally in the case that the connection were not
explicitly returned to the connection pool and was instead being
intercepted by Python garbage collection, which would fail if the garbage
collection operation were being called outside of the asyncio event loop,
leading to a large amount of stack trace activity dumped into logging
and standard output.
The correct behavior is restored, which is that all asyncio connections
that are garbage collected due to not being explicitly returned to the
connection pool are detached from the pool and discarded, along with a
warning, rather than being returned the pool, as they cannot be reliably
reset. In the case of asyncpg connections, the asyncpg-specific
``terminate()`` method will be used to end the connection more gracefully
within this process as opposed to just dropping it.
This change includes a small behavioral change that is hoped to be useful
for debugging asyncio applications, where the warning that's emitted in the
case of asyncio connections being unexpectedly garbage collected has been
made slightly more aggressive by moving it outside of a ``try/except``
block and into a ``finally:`` block, where it will emit unconditionally
regardless of whether the detach/termination operation succeeded or not. It
will also have the effect that applications or test suites which promote
Python warnings to exceptions will see this as a full exception raise,
whereas previously it was not possible for this warning to actually
propagate as an exception. Applications and test suites which need to
tolerate this warning in the interim should adjust the Python warnings
filter to allow these warnings to not raise.
The behavior for traditional sync connections remains unchanged, that
garbage collected connections continue to be returned to the pool normally
without emitting a warning. This will likely be changed in a future major
release to at least emit a similar warning as is emitted for asyncio
drivers, as it is a usage error for pooled connections to be intercepted by
garbage collection without being properly returned to the pool.
+40 -25
View File
@@ -382,7 +382,10 @@ class Pool(log.Identified, event.EventTarget):
self._dialect.do_close(connection)
except BaseException as e:
self.logger.error(
"Exception closing connection %r", connection, exc_info=True
f"Exception {'terminating' if terminate else 'closing'} "
f"connection %r",
connection,
exc_info=True,
)
if not isinstance(e, Exception):
raise
@@ -941,27 +944,32 @@ def _finalize_fairy(
if is_gc_cleanup:
assert ref is not None
_strong_ref_connection_records.pop(ref, None)
elif fairy:
_strong_ref_connection_records.pop(weakref.ref(fairy), None)
if is_gc_cleanup:
assert connection_record is not None
if connection_record.fairy_ref is not ref:
return
assert dbapi_connection is None
dbapi_connection = connection_record.dbapi_connection
elif fairy:
_strong_ref_connection_records.pop(weakref.ref(fairy), None)
# null pool is not _is_asyncio but can be used also with async dialects
dont_restore_gced = (
pool._dialect.is_async and not pool._dialect.has_terminate
)
dont_restore_gced = pool._dialect.is_async
if dont_restore_gced:
detach = connection_record is None or is_gc_cleanup
can_manipulate_connection = ref is None
can_manipulate_connection = not is_gc_cleanup
can_close_or_terminate_connection = (
not pool._dialect.is_async or pool._dialect.has_terminate
)
requires_terminate_for_close = (
pool._dialect.is_async and pool._dialect.has_terminate
)
else:
detach = connection_record is None
can_manipulate_connection = True
can_manipulate_connection = can_close_or_terminate_connection = True
requires_terminate_for_close = False
if dbapi_connection is not None:
if connection_record and echo:
@@ -992,25 +1000,14 @@ def _finalize_fairy(
fairy._pool = pool
fairy.detach()
if can_manipulate_connection:
if can_close_or_terminate_connection:
if pool.dispatch.close_detached:
pool.dispatch.close_detached(dbapi_connection)
pool._close_connection(dbapi_connection)
else:
message = (
"The garbage collector is trying to clean up "
f"connection {dbapi_connection!r}. This feature is "
"unsupported on asyncio "
'dbapis that lack a "terminate" feature, since no '
"IO can be performed at this stage to "
"reset the connection. Please close out all "
"connections when they are no longer used, calling "
"``close()`` or using a context manager to "
"manage their lifetime."
pool._close_connection(
dbapi_connection,
terminate=requires_terminate_for_close,
)
pool.logger.error(message)
util.warn(message)
except BaseException as e:
pool.logger.error(
@@ -1020,6 +1017,24 @@ def _finalize_fairy(
connection_record.invalidate(e=e)
if not isinstance(e, Exception):
raise
finally:
if detach and is_gc_cleanup and dont_restore_gced:
message = (
"The garbage collector is trying to clean up "
f"non-checked-in connection {dbapi_connection!r}, "
f"""which will be {
'dropped, as it cannot be safely terminated'
if not can_close_or_terminate_connection
else 'terminated'
}. """
"Please ensure that SQLAlchemy pooled connections are "
"returned to "
"the pool explicitly, either by calling ``close()`` "
"or by using appropriate context managers to manage "
"their lifecycle."
)
pool.logger.error(message)
util.warn(message)
if connection_record and connection_record.fairy_ref is not None:
connection_record.checkin()
+60 -23
View File
@@ -23,6 +23,7 @@ from sqlalchemy.testing import assert_raises_context_ok
from sqlalchemy.testing import assert_warns_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_none
@@ -456,6 +457,13 @@ class PoolEventsTest(PoolTestBase):
)
canary = []
@event.listens_for(p, "reset")
def reset(conn, rec, state):
canary.append(
f"""reset_{'rollback_ok'
if state.asyncio_safe else 'no_rollback'}"""
)
@event.listens_for(p, "checkin")
def checkin(*arg, **kw):
canary.append("checkin")
@@ -668,7 +676,7 @@ class PoolEventsTest(PoolTestBase):
c1 = p.connect()
eq_(canary, [])
c1.close()
eq_(canary, ["checkin"])
eq_(canary, ["reset_rollback_ok", "checkin"])
def test_reset_event(self):
p, canary = self._reset_event_fixture()
@@ -728,11 +736,13 @@ class PoolEventsTest(PoolTestBase):
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is exc
@testing.combinations((True,), (False,), argnames="is_asyncio")
@testing.combinations((True,), (False,), argnames="has_terminate")
@testing.variation("is_asyncio", [True, False])
@testing.variation("has_terminate", [True, False])
def test_checkin_event_gc(self, is_asyncio, has_terminate):
"""tests for #8419, which have been modified for 2.0 in #9237"""
p, canary = self._checkin_event_fixture(
_is_asyncio=is_asyncio, _has_terminate=has_terminate
_is_asyncio=bool(is_asyncio), _has_terminate=bool(has_terminate)
)
c1 = p.connect()
@@ -740,18 +750,38 @@ class PoolEventsTest(PoolTestBase):
dbapi_connection = weakref.ref(c1.dbapi_connection)
eq_(canary, [])
del c1
lazy_gc()
detach_gced = is_asyncio and not has_terminate
if is_asyncio:
if has_terminate:
with expect_warnings(
"The garbage collector is trying to clean up.*which will "
"be terminated."
):
del c1
lazy_gc()
else:
with expect_warnings(
"The garbage collector is trying to clean up.*which will "
"be dropped, as it cannot be safely terminated."
):
del c1
lazy_gc()
else:
del c1
lazy_gc()
detach_gced = is_asyncio
if detach_gced:
# "close_detached" is not called because for asyncio the
# connection is just lost.
eq_(canary, ["detach"])
if has_terminate:
eq_(canary, ["reset_no_rollback", "detach", "close_detached"])
else:
# "close_detached" is not called because for asyncio without
# terminate the connection is just lost.
eq_(canary, ["reset_no_rollback", "detach"])
else:
eq_(canary, ["checkin"])
eq_(canary, ["reset_rollback_ok", "checkin"])
gc_collect()
if detach_gced:
@@ -769,10 +799,13 @@ class PoolEventsTest(PoolTestBase):
eq_(canary, [])
c1.close()
eq_(canary, ["checkin"])
eq_(canary, ["reset_rollback_ok", "checkin"])
c2.close()
eq_(canary, ["checkin", "checkin"])
eq_(
canary,
["reset_rollback_ok", "checkin", "reset_rollback_ok", "checkin"],
)
def test_listen_targets_scope(self):
canary = []
@@ -1686,28 +1719,32 @@ class QueuePoolTest(PoolTestBase):
raise tsa.exc.DisconnectionError()
conn = pool.connect()
old_dbapi_conn = conn.dbapi_connection
normally_closed_dbapi_conn = conn.dbapi_connection
conn.close()
eq_(old_dbapi_conn.mock_calls, [call.rollback()])
eq_(normally_closed_dbapi_conn.mock_calls, [call.rollback()])
old_dbapi_conn.boom = "yes"
normally_closed_dbapi_conn.boom = "yes"
conn = pool.connect()
dbapi_conn = conn.dbapi_connection
# normally closed conn was checked out again but had a problem,
# so was replaced
eq_(
normally_closed_dbapi_conn.mock_calls,
[call.rollback(), call.close()],
)
not_closed_dbapi_conn = conn.dbapi_connection
del conn
gc_collect()
if detach_gced:
# new connection was detached + abandoned on return
eq_(dbapi_conn.mock_calls, [])
eq_(not_closed_dbapi_conn.mock_calls, [])
else:
# new connection reset and returned to pool
eq_(dbapi_conn.mock_calls, [call.rollback()])
# old connection was just closed - did not get an
# erroneous reset on return
eq_(old_dbapi_conn.mock_calls, [call.rollback(), call.close()])
eq_(not_closed_dbapi_conn.mock_calls, [call.rollback()])
@testing.requires.timing_intensive
def test_recycle_pool_no_race(self):
+86 -74
View File
@@ -1069,25 +1069,25 @@ class TransactionTest(fixtures.TablesTest):
def test_savepoint_seven(self):
users = self.tables.users
conn = testing.db.connect()
trans = conn.begin()
conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
with testing.db.connect() as conn:
trans = conn.begin()
conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
sp2 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
sp2 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
assert conn.in_transaction()
assert conn.in_transaction()
trans.close()
trans.close()
assert not sp1.is_active
assert not sp2.is_active
assert not trans.is_active
assert conn._transaction is None
assert conn._nested_transaction is None
assert not sp1.is_active
assert not sp2.is_active
assert not trans.is_active
assert conn._transaction is None
assert conn._nested_transaction is None
with testing.db.connect() as conn:
eq_(
@@ -1163,41 +1163,47 @@ class IsolationLevelTest(fixtures.TestBase):
def test_engine_param_stays(self):
eng = testing_engine()
isolation_level = eng.dialect.get_isolation_level(
eng.connect().connection.dbapi_connection
)
with eng.connect() as conn:
isolation_level = eng.dialect.get_isolation_level(
conn.connection.dbapi_connection
)
level = self._non_default_isolation_level()
ne_(isolation_level, level)
eng = testing_engine(options=dict(isolation_level=level))
eq_(
eng.dialect.get_isolation_level(
eng.connect().connection.dbapi_connection
),
level,
)
with eng.connect() as conn:
eq_(
eng.dialect.get_isolation_level(
conn.connection.dbapi_connection
),
level,
)
# check that it stays
conn = eng.connect()
eq_(
eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
level,
)
conn.close()
with eng.connect() as conn:
eq_(
eng.dialect.get_isolation_level(
conn.connection.dbapi_connection
),
level,
)
conn = eng.connect()
eq_(
eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
level,
)
conn.close()
with eng.connect() as conn:
eq_(
eng.dialect.get_isolation_level(
conn.connection.dbapi_connection
),
level,
)
def test_default_level(self):
eng = testing_engine(options=dict())
isolation_level = eng.dialect.get_isolation_level(
eng.connect().connection.dbapi_connection
)
with eng.connect() as conn:
isolation_level = eng.dialect.get_isolation_level(
conn.connection.dbapi_connection
)
eq_(isolation_level, self._default_isolation_level())
def test_reset_level(self):
@@ -1335,16 +1341,16 @@ class IsolationLevelTest(fixtures.TestBase):
def test_connection_invalidated(self):
eng = testing_engine()
conn = eng.connect()
c2 = conn.execution_options(
isolation_level=self._non_default_isolation_level()
)
c2.invalidate()
c2.connection
with eng.connect() as conn:
c2 = conn.execution_options(
isolation_level=self._non_default_isolation_level()
)
c2.invalidate()
c2.connection
# TODO: do we want to rebuild the previous isolation?
# for now, this is current behavior so we will leave it.
eq_(c2.get_isolation_level(), self._default_isolation_level())
# TODO: do we want to rebuild the previous isolation?
# for now, this is current behavior so we will leave it.
eq_(c2.get_isolation_level(), self._default_isolation_level())
def test_per_connection(self):
from sqlalchemy.pool import QueuePool
@@ -1384,24 +1390,26 @@ class IsolationLevelTest(fixtures.TestBase):
def test_exception_in_transaction(self):
eng = testing_engine()
c1 = eng.connect()
with expect_raises_message(
exc.InvalidRequestError,
r"This connection has already initialized a SQLAlchemy "
r"Transaction\(\) object via begin\(\) or autobegin; "
r"isolation_level may not be altered unless rollback\(\) or "
r"commit\(\) is called first.",
):
with c1.begin():
c1 = c1.execution_options(
isolation_level=self._non_default_isolation_level()
)
with eng.connect() as c1:
with expect_raises_message(
exc.InvalidRequestError,
r"This connection has already initialized a SQLAlchemy "
r"Transaction\(\) object via begin\(\) or autobegin; "
r"isolation_level may not be altered unless rollback\(\) or "
r"commit\(\) is called first.",
):
with c1.begin():
c1 = c1.execution_options(
isolation_level=self._non_default_isolation_level()
)
# was never set, so we are on original value
eq_(
eng.dialect.get_isolation_level(c1.connection.dbapi_connection),
self._default_isolation_level(),
)
# was never set, so we are on original value
eq_(
eng.dialect.get_isolation_level(
c1.connection.dbapi_connection
),
self._default_isolation_level(),
)
def test_per_statement_bzzt(self):
assert_raises_message(
@@ -1424,22 +1432,26 @@ class IsolationLevelTest(fixtures.TestBase):
}
),
)
conn = eng.connect()
eq_(
eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._non_default_isolation_level(),
)
with eng.connect() as conn:
eq_(
eng.dialect.get_isolation_level(
conn.connection.dbapi_connection
),
self._non_default_isolation_level(),
)
def test_per_option_engine(self):
eng = testing_engine(testing.db.url).execution_options(
isolation_level=self._non_default_isolation_level()
)
conn = eng.connect()
eq_(
eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._non_default_isolation_level(),
)
with eng.connect() as conn:
eq_(
eng.dialect.get_isolation_level(
conn.connection.dbapi_connection
),
self._non_default_isolation_level(),
)
def test_isolation_level_accessors_connection_default(self):
eng = testing_engine(testing.db.url)
+63
View File
@@ -31,6 +31,7 @@ from sqlalchemy.testing import combinations
from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import eq_regex
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
@@ -339,6 +340,68 @@ class AsyncEngineTest(EngineFixture):
is_false(t1 == None)
@testing.variation("simulate_gc", [True, False])
def test_appropriate_warning_for_gced_connection(
self, async_engine, simulate_gc
):
"""test #9237 which builds upon a not really complete solution
added for #8419."""
async def go():
conn = await async_engine.connect()
await conn.begin()
await conn.execute(select(1))
pool_connection = await conn.get_raw_connection()
return pool_connection
from sqlalchemy.util.concurrency import await_only
pool_connection = await_only(go())
rec = pool_connection._connection_record
ref = rec.fairy_ref
pool = pool_connection._pool
echo = False
if simulate_gc:
# not using expect_warnings() here because we also want to do a
# negative test for warnings, and we want to absolutely make sure
# the thing here that emits the warning is the correct path
from sqlalchemy.pool.base import _finalize_fairy
with mock.patch.object(
pool._dialect,
"do_rollback",
mock.Mock(side_effect=Exception("can't run rollback")),
), mock.patch("sqlalchemy.util.warn") as m:
_finalize_fairy(
None, rec, pool, ref, echo, transaction_was_reset=False
)
if async_engine.dialect.has_terminate:
expected_msg = (
"The garbage collector is trying to clean up.*which will "
"be terminated."
)
else:
expected_msg = (
"The garbage collector is trying to clean up.*which will "
"be dropped, as it cannot be safely terminated."
)
# [1] == .args, not in 3.7
eq_regex(m.mock_calls[0][1][0], expected_msg)
else:
# the warning emitted by the pool is inside of a try/except:
# so it's impossible right now to have this warning "raise".
# for now, test by using mock.patch
with mock.patch("sqlalchemy.util.warn") as m:
pool_connection.close()
eq_(m.mock_calls, [])
def test_clear_compiled_cache(self, async_engine):
async_engine.sync_engine._compiled_cache["foo"] = "bar"
eq_(async_engine.sync_engine._compiled_cache["foo"], "bar")