raise for asyncio-incompatible pool classes

An error is raised if a :class:`.QueuePool` or other non-asyncio pool class
is passed to :func:`_asyncio.create_async_engine`.  This engine only
accepts asyncio-compatible pool classes including
:class:`.AsyncAdaptedQueuePool`. Other pool classes such as
:class:`.NullPool` are compatible with both synchronous and asynchronous
engines as they do not perform any locking.

Fixes: #8771
Change-Id: I5843ccea7d824488492d1a9d46207b9f05330ae3
(cherry picked from commit c449505f65)
This commit is contained in:
Mike Bayer
2024-02-14 21:10:20 -05:00
parent 386e05bed3
commit d08322c018
9 changed files with 199 additions and 6 deletions
+15
View File
@@ -0,0 +1,15 @@
.. change::
:tags: bug, asyncio
:tickets: 8771
An error is raised if a :class:`.QueuePool` or other non-asyncio pool class
is passed to :func:`_asyncio.create_async_engine`. This engine only
accepts asyncio-compatible pool classes including
:class:`.AsyncAdaptedQueuePool`. Other pool classes such as
:class:`.NullPool` are compatible with both synchronous and asynchronous
engines as they do not perform any locking.
.. seealso::
:ref:`pool_api`
+12
View File
@@ -50,6 +50,13 @@ queued up - the pool would only grow to that size if the application
actually used five connections concurrently, in which case the usage of a
small pool is an entirely appropriate default behavior.
.. note:: The :class:`.QueuePool` class is **not compatible with asyncio**.
When using :class:`_asyncio.create_async_engine` to create an instance of
:class:`.AsyncEngine`, the :class:`_pool.AsyncAdaptedQueuePool` class,
which makes use of an asyncio-compatible queue implementation, is used
instead.
.. _pool_switching:
Switching Pool Implementations
@@ -713,6 +720,8 @@ like in the following example::
my_pool = create_pool_from_url("mysql+mysqldb://", poolclass=NullPool)
.. _pool_api:
API Documentation - Available Pool Implementations
--------------------------------------------------
@@ -722,6 +731,9 @@ API Documentation - Available Pool Implementations
.. autoclass:: sqlalchemy.pool.QueuePool
:members:
.. autoclass:: sqlalchemy.pool.AsyncAdaptedQueuePool
:members:
.. autoclass:: SingletonThreadPool
:members:
+22
View File
@@ -188,6 +188,28 @@ sooner.
:ref:`connections_toplevel`
.. _error_pcls:
Pool class cannot be used with asyncio engine (or vice versa)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The :class:`_pool.QueuePool` pool class uses a ``thread.Lock`` object internally
and is not compatible with asyncio. If using the :func:`_asyncio.create_async_engine`
function to create an :class:`.AsyncEngine`, the appropriate queue pool class
is :class:`_pool.AsyncAdaptedQueuePool`, which is used automatically and does
not need to be specified.
In addition to :class:`_pool.AsyncAdaptedQueuePool`, the :class:`_pool.NullPool`
and :class:`_pool.StaticPool` pool classes do not use locks and are also
suitable for use with async engines.
This error is also raised in reverse in the unlikely case that the
:class:`_pool.AsyncAdaptedQueuePool` pool class is indicated explicitly with
the :func:`_sa.create_engine` function.
.. seealso::
:ref:`pooling_toplevel`
.. _error_8s2b:
+11
View File
@@ -663,6 +663,17 @@ def create_engine(url: Union[str, _url.URL], **kwargs: Any) -> Engine:
else:
pool._dialect = dialect
if (
hasattr(pool, "_is_asyncio")
and pool._is_asyncio is not dialect.is_async
):
raise exc.ArgumentError(
f"Pool class {pool.__class__.__name__} cannot be "
f"used with {'non-' if not dialect.is_async else ''}"
"asyncio engine",
code="pcls",
)
# create engine.
if not pop_kwarg("future", True):
raise exc.ArgumentError(
+36 -2
View File
@@ -47,8 +47,18 @@ class QueuePool(Pool):
that imposes a limit on the number of open connections.
:class:`.QueuePool` is the default pooling implementation used for
all :class:`_engine.Engine` objects, unless the SQLite dialect is
in use with a ``:memory:`` database.
all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
database.
The :class:`.QueuePool` class **is not compatible** with asyncio and
:func:`_asyncio.create_async_engine`. The
:class:`.AsyncAdaptedQueuePool` class is used automatically when
using :func:`_asyncio.create_async_engine`, if no other kind of pool
is specified.
.. seealso::
:class:`.AsyncAdaptedQueuePool`
"""
@@ -123,6 +133,7 @@ class QueuePool(Pool):
:class:`_pool.Pool` constructor.
"""
Pool.__init__(self, creator, **kw)
self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
self._overflow = 0 - pool_size
@@ -248,6 +259,18 @@ class QueuePool(Pool):
class AsyncAdaptedQueuePool(QueuePool):
"""An asyncio-compatible version of :class:`.QueuePool`.
This pool is used by default when using :class:`.AsyncEngine` engines that
were generated from :func:`_asyncio.create_async_engine`. It uses an
asyncio-compatible queue implementation that does not use
``threading.Lock``.
The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
otherwise identical to that of :class:`.QueuePool`.
"""
_is_asyncio = True # type: ignore[assignment]
_queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
sqla_queue.AsyncAdaptedQueue
@@ -270,6 +293,9 @@ class NullPool(Pool):
invalidation are not supported by this Pool implementation, since
no connections are held persistently.
The :class:`.NullPool` class **is compatible** with asyncio and
:func:`_asyncio.create_async_engine`.
"""
def status(self) -> str:
@@ -317,6 +343,9 @@ class SingletonThreadPool(Pool):
scenarios using a SQLite ``:memory:`` database and is not recommended
for production use.
The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
and :func:`_asyncio.create_async_engine`.
Options are the same as those of :class:`_pool.Pool`, as well as:
@@ -425,6 +454,8 @@ class StaticPool(Pool):
invalidation (which is also used to support auto-reconnect) are only
partially supported right now and may not yield good results.
The :class:`.StaticPool` class **is compatible** with asyncio and
:func:`_asyncio.create_async_engine`.
"""
@@ -489,6 +520,9 @@ class AssertionPool(Pool):
at a time. Useful for debugging code that is using more connections
than desired.
The :class:`.AssertionPool` class **is compatible** with asyncio and
:func:`_asyncio.create_async_engine`.
"""
_conn: Optional[ConnectionPoolEntry]
+6 -1
View File
@@ -368,7 +368,12 @@ def testing_engine(
True # enable event blocks, helps with profiling
)
if isinstance(engine.pool, pool.QueuePool):
if (
isinstance(engine.pool, pool.QueuePool)
and "pool" not in options
and "pool_timeout" not in options
and "max_overflow" not in options
):
engine.pool._timeout = 0
engine.pool._max_overflow = 0
if use_reaper:
+10 -1
View File
@@ -34,6 +34,7 @@ from sqlalchemy.engine import BindTyping
from sqlalchemy.engine import default
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.base import Engine
from sqlalchemy.pool import AsyncAdaptedQueuePool
from sqlalchemy.pool import NullPool
from sqlalchemy.pool import QueuePool
from sqlalchemy.sql import column
@@ -2411,7 +2412,15 @@ class EngineEventsTest(fixtures.TestBase):
@testing.combinations(True, False, argnames="close")
def test_close_parameter(self, testing_engine, close):
eng = testing_engine(
options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool)
options=dict(
pool_size=1,
max_overflow=0,
poolclass=(
QueuePool
if not testing.db.dialect.is_async
else AsyncAdaptedQueuePool
),
)
)
conn = eng.connect()
+11 -2
View File
@@ -12,6 +12,8 @@ from sqlalchemy.engine import base
from sqlalchemy.engine import characteristics
from sqlalchemy.engine import default
from sqlalchemy.engine import url
from sqlalchemy.pool import AsyncAdaptedQueuePool
from sqlalchemy.pool import QueuePool
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
@@ -1345,10 +1347,17 @@ class IsolationLevelTest(fixtures.TestBase):
eq_(c2.get_isolation_level(), self._default_isolation_level())
def test_per_connection(self):
from sqlalchemy.pool import QueuePool
eng = testing_engine(
options=dict(poolclass=QueuePool, pool_size=2, max_overflow=0)
options=dict(
poolclass=(
QueuePool
if not testing.db.dialect.is_async
else AsyncAdaptedQueuePool
),
pool_size=2,
max_overflow=0,
)
)
c1 = eng.connect()
+76
View File
@@ -3,6 +3,7 @@ import contextlib
import inspect as stdlib_inspect
from unittest.mock import patch
from sqlalchemy import AssertionPool
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import delete
@@ -11,7 +12,11 @@ from sqlalchemy import exc
from sqlalchemy import func
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import NullPool
from sqlalchemy import QueuePool
from sqlalchemy import select
from sqlalchemy import SingletonThreadPool
from sqlalchemy import StaticPool
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
@@ -520,6 +525,77 @@ class AsyncEngineTest(EngineFixture):
eq_(isolation_level, "SERIALIZABLE")
@testing.combinations(
(
AsyncAdaptedQueuePool,
True,
),
(
QueuePool,
False,
),
(NullPool, True),
(SingletonThreadPool, False),
(StaticPool, True),
(AssertionPool, True),
argnames="pool_cls,should_work",
)
@testing.variation("instantiate", [True, False])
@async_test
async def test_pool_classes(
self, async_testing_engine, pool_cls, instantiate, should_work
):
"""test #8771"""
if instantiate:
if pool_cls in (QueuePool, AsyncAdaptedQueuePool):
pool = pool_cls(creator=testing.db.pool._creator, timeout=10)
else:
pool = pool_cls(
creator=testing.db.pool._creator,
)
options = {"pool": pool}
else:
if pool_cls in (QueuePool, AsyncAdaptedQueuePool):
options = {"poolclass": pool_cls, "pool_timeout": 10}
else:
options = {"poolclass": pool_cls}
if not should_work:
with expect_raises_message(
exc.ArgumentError,
f"Pool class {pool_cls.__name__} "
"cannot be used with asyncio engine",
):
async_testing_engine(options=options)
return
e = async_testing_engine(options=options)
if pool_cls is AssertionPool:
async with e.connect() as conn:
result = await conn.scalar(select(1))
eq_(result, 1)
return
async def go():
async with e.connect() as conn:
result = await conn.scalar(select(1))
eq_(result, 1)
return result
eq_(await asyncio.gather(*[go() for i in range(10)]), [1] * 10)
def test_cant_use_async_pool_w_create_engine(self):
"""supplemental test for #8771"""
with expect_raises_message(
exc.ArgumentError,
"Pool class AsyncAdaptedQueuePool "
"cannot be used with non-asyncio engine",
):
create_engine("sqlite://", poolclass=AsyncAdaptedQueuePool)
@testing.requires.queue_pool
@async_test
async def test_dispose(self, async_engine):