Add connection pool for aiosqlite and document usage/behavior.

Also fix up a couple lifecycle tests
This commit is contained in:
Charles Leifer
2026-03-11 12:51:01 -05:00
parent 66990b5779
commit 52bb6c92f4
3 changed files with 151 additions and 69 deletions
+35 -14
View File
@@ -226,17 +226,6 @@ To shut down completely (e.g. during application teardown):
await db.close_pool()
SQLite
^^^^^^
SQLite uses a single shared connection, as the underlying database does not
support concurrent writers.
Generally SQLite is a poor fit for asynchronous workflows where writes may be
coming in at any time. Furthermore, SQLite does not do any network I/O.
The SQLite implementation is provided mostly for testing and local development.
MySQL and Postgresql
^^^^^^^^^^^^^^^^^^^^
@@ -244,9 +233,9 @@ MySQL and Postgresql use the driver's native connection pool.
Pool configuration options include:
* ``pool_size`` - Maximum number of connections
* ``pool_min_size`` - Minimum pool size
* ``acquire_timeout`` - Timeout when acquiring a connection
* ``pool_size``: Maximum number of connections
* ``pool_min_size``: Minimum pool size
* ``acquire_timeout``: Timeout when acquiring a connection
.. code-block:: python
@@ -258,6 +247,38 @@ Pool configuration options include:
pool_min_size=1,
acquire_timeout=10)
SQLite
^^^^^^
Peewee provides a simple connection-pooling implementation for SQLite
connections.
Pool configuration options include:
* ``pool_size``: Maximum number of connections
* ``acquire_timeout``: Timeout when acquiring a connection
SQLite operates on local disk storage, so queries typically execute extremely
quickly (microseconds / few milliseconds). The cost of dispatching to a
background thread and wrapping in coroutines increases the latency per query.
For every query executed, a closure must be created, a future allocated, a
queue written-to, a loop ``call_soon_threadsafe()`` issued, and two context
switches made. This is the case with `aiosqlite <https://github.com/omnilib/aiosqlite/blob/main/aiosqlite/core.py>`__.
If your SQLite workload is heavy enough that avoiding blocking the event-loop
is an issue, SQLite may not be a good fit. SQLite only allows one writer at a
time, so while using an async wrapper may keep things responsive while waiting
to obtain the write lock, writes will not occur "faster", the bottleneck has
merely been moved. Conversely, if you dont have that much load, the async
wrapper adds complexity and overhead for no measurable benefit.
To use SQLite in an async environment anyways, it is strongly recommended to
use WAL-mode, which allows multiple readers to co-exist with a single writer:
.. code-block:: python
db = AsyncSqliteDatabase('app.db', pragmas={'journal_mode': 'wal'})
Sharp Corners
-------------
+71 -13
View File
@@ -461,7 +461,71 @@ class AsyncConnectionWrapper(object):
self.conn = None
class AsyncSQLiteConnection(AsyncConnectionWrapper):
class AsyncSqlitePool(object):
def __init__(self, database, pool_size=5, on_connect=None,
**connect_params):
self._database = database
self._pool_size = pool_size
self._on_connect = on_connect
self._connect_params = connect_params
self._queue = asyncio.Queue(maxsize=pool_size)
self._all_connections = []
self._closed = False
async def initialize(self):
for _ in range(self._pool_size):
conn = await self._create_connection()
self._queue.put_nowait(conn)
return self
async def _create_connection(self):
conn = await aiosqlite.connect(
self._database,
isolation_level=None,
**self._connect_params)
if self._on_connect is not None:
await self._on_connect(conn )
wrapped = AsyncSqliteConnection(conn )
self._all_connections.append(wrapped)
return wrapped
async def acquire(self, timeout=None):
if self._closed:
raise InterfaceError('Pool is closed.')
return await asyncio.wait_for(self._queue.get(), timeout=timeout)
def _conn_is_valid(self, conn):
driver_conn = conn.conn
if driver_conn is None:
return False
if not driver_conn._running or not driver_conn._connection:
return False
return True
async def release(self, conn):
if self._closed:
return
elif self._conn_is_valid(conn):
await self._queue.put(conn)
else:
try:
self._all_connections.remove(conn)
except ValueError:
pass
await self._queue.put(await self._create_connection())
async def close(self):
self._closed = True
conns, self._all_connections = list(self._all_connections), []
for conn in conns:
try:
await conn.close()
except Exception:
logger.warning('Error closing pooled connection',
exc_info=True)
class AsyncSqliteConnection(AsyncConnectionWrapper):
async def _execute(self, sql, params=None):
params = params or ()
cursor = await self.conn.execute(sql, params)
@@ -502,9 +566,9 @@ class AsyncSqliteDatabase(AsyncDatabaseMixin, SqliteDatabase):
async def _create_pool_async(self):
if aiosqlite is None:
raise ImproperlyConfigured('aiosqlite is not installed')
conn = await aiosqlite.connect(self.database, isolation_level=None)
await self._add_conn_hooks(conn)
return AsyncSQLiteConnection(conn)
pool = AsyncSqlitePool(self.database, pool_size=self._pool_size,
on_connect=self._add_conn_hooks)
return await pool.initialize()
async def _add_conn_hooks(self, conn):
if self._pragmas:
@@ -522,17 +586,11 @@ class AsyncSqliteDatabase(AsyncDatabaseMixin, SqliteDatabase):
await conn.create_function(name, n_params, fn, **kwargs)
async def _pool_acquire(self):
# SQLite uses a single shared connection. Re-create if lost.
async with self._pool_lock:
if self._pool is None or self._pool.conn is None:
self._pool = await self._create_pool_async()
return self._pool
return await self._pool.acquire(timeout=self._acquire_timeout)
async def _pool_release(self, conn):
# For SQLite we don't actually release the shared connection — we only
# disassociate it from the current task's state. The connection stays
# open until close_pool().
pass
if conn is not None:
await self._pool.release(conn)
async def _pool_close(self):
if self._pool:
+45 -42
View File
@@ -1,6 +1,8 @@
import asyncio
import collections
import contextvars
import gc
import glob
import itertools
import tempfile
import os
@@ -311,7 +313,7 @@ class TestConnectionWrappers(unittest.IsolatedAsyncioTestCase):
mock_conn = AsyncMock()
mock_conn.execute.return_value = mock_cursor
result = await AsyncSQLiteConnection(mock_conn).execute(
result = await AsyncSqliteConnection(mock_conn).execute(
'SELECT * FROM test')
self.assertIsInstance(result, CursorAdapter)
self.assertEqual(result.fetchall(), [(1, 'test')])
@@ -324,7 +326,7 @@ class TestConnectionWrappers(unittest.IsolatedAsyncioTestCase):
mock_conn = AsyncMock()
mock_conn.execute.return_value = mock_cursor
conn = AsyncSQLiteConnection(mock_conn)
conn = AsyncSqliteConnection(mock_conn)
cursor = await conn.execute_iter('SELECT a, b FROM t')
self.assertIsInstance(cursor, CursorAdapter)
self.assertIsNotNone(cursor._fetch_many)
@@ -337,7 +339,7 @@ class TestConnectionWrappers(unittest.IsolatedAsyncioTestCase):
mock_conn = AsyncMock()
mock_conn.execute.return_value = mock_cursor
conn = AsyncSQLiteConnection(mock_conn)
conn = AsyncSqliteConnection(mock_conn)
cursor = await conn.execute_iter('SELECT 1')
self.assertTrue(conn._lock.locked())
await cursor.aclose()
@@ -347,7 +349,7 @@ class TestConnectionWrappers(unittest.IsolatedAsyncioTestCase):
async def test_sqlite_execute_iter_lock_on_failure(self):
mock_conn = AsyncMock()
mock_conn.execute.side_effect = RuntimeError('fail')
conn = AsyncSQLiteConnection(mock_conn)
conn = AsyncSqliteConnection(mock_conn)
with self.assertRaises(RuntimeError):
await conn.execute_iter('invalid')
self.assertFalse(conn._lock.locked())
@@ -551,48 +553,49 @@ class TestConnectionWrappers(unittest.IsolatedAsyncioTestCase):
class TestTaskLifecycle(unittest.IsolatedAsyncioTestCase):
async def test_concurrent_task_state_isolation(self):
db = AsyncSqliteDatabase(':memory:')
TestModel._meta.set_database(db)
async with db:
await db.acreate_tables([TestModel])
async def capture(tid):
async with db:
before = id(db._state.get())
await db.run(TestModel.create, name=f't{tid}', value=tid)
after = id(db._state.get())
return before == after
results = await asyncio.gather(*[capture(i) for i in range(5)])
self.assertTrue(all(results))
await db.close_pool()
async def asyncSetUp(self):
with tempfile.NamedTemporaryFile(delete=False) as f:
self.db_path = f.name
self.db = AsyncSqliteDatabase(self.db_path)
TestModel._meta.set_database(self.db)
await self.db.aconnect()
await self.db.acreate_tables([TestModel])
async def asyncTearDown(self):
await self.db.aclose()
await self.db.close_pool()
if self.db_path and os.path.exists(self.db_path):
for fname in glob.glob(self.db_path + '*'):
os.unlink(fname)
async def test_task_state_cleanup_after_completion(self):
db = AsyncSqliteDatabase(':memory:')
TestModel._meta.set_database(db)
async with db:
await db.acreate_tables([TestModel])
async def task_with_state():
async with db:
await db.run(TestModel.create, name='test', value=1)
return db._state._get_storage_key()
task_key = await task_with_state()
db._state.cleanup_dead_tasks()
await db.close_pool()
async def task_with_state():
async with self.db:
await self.db.run(TestModel.create, name='test', value=1)
return self.db._state._get_storage_key()
task_key = await asyncio.create_task(task_with_state())
await asyncio.sleep(0)
gc.collect()
self.db._state.cleanup_dead_tasks()
self.assertFalse(task_key in self.db._state._state_storage)
async def test_concurrent_task_state_isolation(self):
db = AsyncSqliteDatabase(':memory:')
TestModel._meta.set_database(db)
async with db:
await db.acreate_tables([TestModel])
async def capture(tid):
async with db:
before = id(db._state.get())
await db.run(TestModel.create, name=f't{tid}', value=tid)
after = id(db._state.get())
return before == after
results = await asyncio.gather(*[capture(i) for i in range(5)])
self.assertTrue(all(results))
await db.close_pool()
async def capture(tid):
async with self.db:
before = id(self.db._state.get())
await self.db.run(TestModel.create, name=f't{tid}', value=tid)
after = id(self.db._state.get())
self.assertEqual(before, after)
return before
results = await asyncio.gather(*[capture(i) for i in range(5)])
self.assertTrue(all(results))
self.assertTrue(len(set(results)), 5)
class IntegrationTests(object):