mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-06-01 21:38:55 -04:00
Merge "Add LIFO for connection pooling"
This commit is contained in:
Vendored
+28
@@ -313,6 +313,34 @@ well as for casting decimal bind values for MySQL.
|
||||
|
||||
:ticket:`3981`
|
||||
|
||||
.. _change_pr467:
|
||||
|
||||
New last-in-first-out strategy for QueuePool
|
||||
---------------------------------------------
|
||||
|
||||
The connection pool usually used by :func:`.create_engine` is known
|
||||
as :class:`.QueuePool`. This pool uses an object equivalent to Python's
|
||||
built-in ``Queue`` class in order to store database connections waiting
|
||||
to be used. The ``Queue`` features first-in-first-out behavior, which is
|
||||
intended to provide a round-robin use of the database connections that are
|
||||
persistently in the pool. However, a potential downside of this is that
|
||||
when the utilization of the pool is low, the re-use of each connection in series
|
||||
means that a server-side timeout strategy that attempts to reduce unused
|
||||
connections is prevented from shutting down these connections. To suit
|
||||
this use case, a new flag :paramref:`.create_engine.pool_use_lifo` is added
|
||||
which reverses the ``.get()`` method of the ``Queue`` to pull the connection
|
||||
from the beginning of the queue instead of the end, essentially turning the
|
||||
"queue" into a "stack" (adding a whole new pool called ``StackPool`` was
|
||||
considered, however this was too much verbosity).
|
||||
|
||||
.. seealso::
|
||||
|
||||
:ref:`pool_use_lifo`
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Key Behavioral Changes - Core
|
||||
=============================
|
||||
|
||||
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
.. change::
|
||||
:tags: feature, engine
|
||||
|
||||
Added new "lifo" mode to :class:`.QueuePool`, typically enabled by setting
|
||||
the flag :paramref:`.create_engine.pool_use_lifo` to True. "lifo" mode
|
||||
means the same connection just checked in will be the first to be checked
|
||||
out again, allowing excess connections to be cleaned up from the server
|
||||
side during periods of the pool being only partially utilized. Pull request
|
||||
courtesy Taem Park.
|
||||
|
||||
.. seealso::
|
||||
|
||||
:ref:`change_pr467`
|
||||
Vendored
+35
@@ -396,6 +396,41 @@ a DBAPI connection might be invalidated include:
|
||||
All invalidations which occur will invoke the :meth:`.PoolEvents.invalidate`
|
||||
event.
|
||||
|
||||
.. _pool_use_lifo:
|
||||
|
||||
Using FIFO vs. LIFO
|
||||
-------------------
|
||||
|
||||
The :class:`.QueuePool` class features a flag called
|
||||
:paramref:`.QueuePool.use_lifo`, which can also be accessed from
|
||||
:func:`.create_engine` via the flag :paramref:`.create_engine.pool_use_lifo`.
|
||||
Setting this flag to ``True`` causes the pool's "queue" behavior to instead be
|
||||
that of a "stack", e.g. the last connection to be returned to the pool is the
|
||||
first one to be used on the next request. In contrast to the pool's long-
|
||||
standing behavior of first-in-first-out, which produces a round-robin effect of
|
||||
using each connection in the pool in series, lifo mode allows excess
|
||||
connections to remain idle in the pool, allowing server-side timeout schemes to
|
||||
close these connections out. The difference between FIFO and LIFO is
|
||||
basically whether or not its desirable for the pool to keep a full set of
|
||||
connections ready to go even during idle periods::
|
||||
|
||||
engine = create_engine(
|
||||
"postgreql://", pool_use_lifo=True, pool_pre_ping=True)
|
||||
|
||||
Above, we also make use of the :paramref:`.create_engine.pool_pre_ping` flag
|
||||
so that connections which are closed from the server side are gracefully
|
||||
handled by the connection pool and replaced with a new connection.
|
||||
|
||||
Note that the flag only applies to :class:`.QueuePool` use.
|
||||
|
||||
.. versionadded:: 1.3
|
||||
|
||||
.. seealso::
|
||||
|
||||
:ref:`pool_disconnects`
|
||||
|
||||
|
||||
|
||||
Using Connection Pools with Multiprocessing
|
||||
-------------------------------------------
|
||||
|
||||
|
||||
@@ -399,6 +399,21 @@ def create_engine(*args, **kwargs):
|
||||
up on getting a connection from the pool. This is only used
|
||||
with :class:`~sqlalchemy.pool.QueuePool`.
|
||||
|
||||
:param pool_use_lifo=False: use LIFO (last-in-first-out) when retrieving
|
||||
connections from :class:`.QueuePool` instead of FIFO
|
||||
(first-in-first-out). Using LIFO, a server-side timeout scheme can
|
||||
reduce the number of connections used during non- peak periods of
|
||||
use. When planning for server-side timeouts, ensure that a recycle or
|
||||
pre-ping strategy is in use to gracefully handle stale connections.
|
||||
|
||||
.. versionadded:: 1.3
|
||||
|
||||
.. seealso::
|
||||
|
||||
:ref:`pool_use_lifo`
|
||||
|
||||
:ref:`pool_disconnects`
|
||||
|
||||
:param plugins: string list of plugin names to load. See
|
||||
:class:`.CreateEnginePlugin` for background.
|
||||
|
||||
|
||||
@@ -123,7 +123,8 @@ class DefaultEngineStrategy(EngineStrategy):
|
||||
'events': 'pool_events',
|
||||
'use_threadlocal': 'pool_threadlocal',
|
||||
'reset_on_return': 'pool_reset_on_return',
|
||||
'pre_ping': 'pool_pre_ping'}
|
||||
'pre_ping': 'pool_pre_ping',
|
||||
'use_lifo': 'pool_use_lifo'}
|
||||
for k in util.get_cls_kwargs(poolclass):
|
||||
tk = translate.get(k, k)
|
||||
if tk in kwargs:
|
||||
|
||||
@@ -30,7 +30,7 @@ class QueuePool(Pool):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
|
||||
def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30, use_lifo=False,
|
||||
**kw):
|
||||
r"""
|
||||
Construct a QueuePool.
|
||||
@@ -63,6 +63,21 @@ class QueuePool(Pool):
|
||||
:param timeout: The number of seconds to wait before giving up
|
||||
on returning a connection. Defaults to 30.
|
||||
|
||||
:param use_lifo: use LIFO (last-in-first-out) when retrieving
|
||||
connections instead of FIFO (first-in-first-out). Using LIFO, a
|
||||
server-side timeout scheme can reduce the number of connections used
|
||||
during non-peak periods of use. When planning for server-side
|
||||
timeouts, ensure that a recycle or pre-ping strategy is in use to
|
||||
gracefully handle stale connections.
|
||||
|
||||
.. versionadded:: 1.3
|
||||
|
||||
.. seealso::
|
||||
|
||||
:ref:`pool_use_lifo`
|
||||
|
||||
:ref:`pool_disconnects`
|
||||
|
||||
:param \**kw: Other keyword arguments including
|
||||
:paramref:`.Pool.recycle`, :paramref:`.Pool.echo`,
|
||||
:paramref:`.Pool.reset_on_return` and others are passed to the
|
||||
@@ -70,7 +85,7 @@ class QueuePool(Pool):
|
||||
|
||||
"""
|
||||
Pool.__init__(self, creator, **kw)
|
||||
self._pool = sqla_queue.Queue(pool_size)
|
||||
self._pool = sqla_queue.Queue(pool_size, use_lifo=use_lifo)
|
||||
self._overflow = 0 - pool_size
|
||||
self._max_overflow = max_overflow
|
||||
self._timeout = timeout
|
||||
|
||||
@@ -39,10 +39,12 @@ class Full(Exception):
|
||||
|
||||
|
||||
class Queue:
|
||||
def __init__(self, maxsize=0):
|
||||
def __init__(self, maxsize=0, use_lifo=False):
|
||||
"""Initialize a queue object with a given maximum size.
|
||||
|
||||
If `maxsize` is <= 0, the queue size is infinite.
|
||||
|
||||
If `use_lifo` is True, this Queue acts like a Stack (LIFO).
|
||||
"""
|
||||
|
||||
self._init(maxsize)
|
||||
@@ -57,6 +59,8 @@ class Queue:
|
||||
# Notify not_full whenever an item is removed from the queue;
|
||||
# a thread waiting to put is notified then.
|
||||
self.not_full = threading.Condition(self.mutex)
|
||||
# If this queue uses LIFO or FIFO
|
||||
self.use_lifo = use_lifo
|
||||
|
||||
def qsize(self):
|
||||
"""Return the approximate size of the queue (not reliable!)."""
|
||||
@@ -196,4 +200,9 @@ class Queue:
|
||||
|
||||
# Get an item from the queue
|
||||
def _get(self):
|
||||
return self.queue.popleft()
|
||||
if self.use_lifo:
|
||||
# LIFO
|
||||
return self.queue.pop()
|
||||
else:
|
||||
# FIFO
|
||||
return self.queue.popleft()
|
||||
|
||||
@@ -1983,6 +1983,83 @@ class QueuePoolTest(PoolTestBase):
|
||||
rec.checkin
|
||||
)
|
||||
|
||||
def test_lifo(self):
|
||||
c1, c2, c3 = Mock(), Mock(), Mock()
|
||||
connections = [c1, c2, c3]
|
||||
|
||||
def creator():
|
||||
return connections.pop(0)
|
||||
|
||||
p = pool.QueuePool(creator, use_lifo=True)
|
||||
|
||||
pc1 = p.connect()
|
||||
pc2 = p.connect()
|
||||
pc3 = p.connect()
|
||||
|
||||
pc1.close()
|
||||
pc2.close()
|
||||
pc3.close()
|
||||
|
||||
for i in range(5):
|
||||
pc1 = p.connect()
|
||||
is_(pc1.connection, c3)
|
||||
pc1.close()
|
||||
|
||||
pc1 = p.connect()
|
||||
is_(pc1.connection, c3)
|
||||
|
||||
pc2 = p.connect()
|
||||
is_(pc2.connection, c2)
|
||||
pc2.close()
|
||||
|
||||
pc3 = p.connect()
|
||||
is_(pc3.connection, c2)
|
||||
|
||||
pc2 = p.connect()
|
||||
is_(pc2.connection, c1)
|
||||
|
||||
pc2.close()
|
||||
pc3.close()
|
||||
pc1.close()
|
||||
|
||||
def test_fifo(self):
|
||||
c1, c2, c3 = Mock(), Mock(), Mock()
|
||||
connections = [c1, c2, c3]
|
||||
|
||||
def creator():
|
||||
return connections.pop(0)
|
||||
|
||||
p = pool.QueuePool(creator)
|
||||
|
||||
pc1 = p.connect()
|
||||
pc2 = p.connect()
|
||||
pc3 = p.connect()
|
||||
|
||||
pc1.close()
|
||||
pc2.close()
|
||||
pc3.close()
|
||||
|
||||
pc1 = p.connect()
|
||||
is_(pc1.connection, c1)
|
||||
pc1.close()
|
||||
|
||||
pc1 = p.connect()
|
||||
is_(pc1.connection, c2)
|
||||
|
||||
pc2 = p.connect()
|
||||
is_(pc2.connection, c3)
|
||||
pc2.close()
|
||||
|
||||
pc3 = p.connect()
|
||||
is_(pc3.connection, c1)
|
||||
|
||||
pc2 = p.connect()
|
||||
is_(pc2.connection, c3)
|
||||
|
||||
pc2.close()
|
||||
pc3.close()
|
||||
pc1.close()
|
||||
|
||||
|
||||
class ResetOnReturnTest(PoolTestBase):
|
||||
def _fixture(self, **kw):
|
||||
|
||||
Reference in New Issue
Block a user