Files
2026-06-01 00:08:27 -07:00

160 lines
6.1 KiB
Python

from __future__ import annotations
import sqlite3
import time
from collections.abc import Mapping
from itertools import tee
import re
from django.db.backends.sqlite3.base import DatabaseWrapper as DjangoSQLiteDatabaseWrapper
from django.db.backends.sqlite3.base import SQLiteCursorWrapper as DjangoSQLiteCursorWrapper
def _sqlite_lock_retry_timeout() -> float:
from django.conf import settings
return settings.CONFIG.SQLITE_LOCK_RETRY_TIMEOUT
def _sqlite_lock_retry_interval() -> float:
from django.conf import settings
return settings.CONFIG.SQLITE_LOCK_RETRY_INTERVAL
def _format_sql(query: str, params=None) -> str:
compact = " ".join(str(query).split())
match = re.match(r'^(INSERT INTO|UPDATE|DELETE FROM|SELECT) "?([A-Za-z0-9_]+)"?', compact, flags=re.IGNORECASE)
if match:
compact = f"{match.group(1).upper()} {match.group(2)}"
if params is not None:
if isinstance(params, str):
params_summary = params
elif isinstance(params, (tuple, list)):
preview = ", ".join(repr(param)[:60] for param in params[:4])
params_summary = f"{len(params)} params: {preview}"
elif isinstance(params, Mapping):
preview = ", ".join(f"{key}={repr(value)[:60]}" for key, value in list(params.items())[:4])
params_summary = f"{len(params)} params: {preview}"
else:
params_summary = repr(params)[:120]
compact = f"{compact} ({params_summary})"
return compact[:260]
def _log_locked_database(query: str, params=None, *, attempt: int, elapsed: float, retry_interval: float) -> None:
from rich.console import Console
from archivebox.misc.db import log_sqlite_lock_holders
console = Console(stderr=True)
console.print(
f"[yellow][*] SQLite database is locked for {elapsed:.0f}s; retrying in {retry_interval:g}s... attempt={attempt}[/yellow]",
)
console.print(f"[yellow] Query: {_format_sql(query, params)}[/yellow]")
log_sqlite_lock_holders(console)
def _connection_in_transaction(connection) -> bool:
try:
return bool(connection and connection.in_transaction)
except (AttributeError, sqlite3.Error):
return False
def _recover_non_atomic_connection(db_wrapper, query: str) -> None:
if db_wrapper is None or getattr(db_wrapper, "in_atomic_block", False):
return
connection = getattr(db_wrapper, "connection", None)
if not _connection_in_transaction(connection):
return
try:
connection.rollback()
except sqlite3.Error:
return
def _is_inside_atomic(db_wrapper) -> bool:
return bool(db_wrapper is not None and getattr(db_wrapper, "in_atomic_block", False))
def _abort_locked_database(query: str, params=None, *, elapsed: float, db_wrapper=None) -> None:
_recover_non_atomic_connection(db_wrapper, query)
raise sqlite3.OperationalError(
f"SQLite database remained locked for {elapsed:.0f}s while running {_format_sql(query, params)}; "
"aborting instead of retrying indefinitely",
)
def _retry_locked_database(action, query: str, params=None, *, db_wrapper=None):
attempt = 0
started_at = time.monotonic()
while True:
try:
return action()
except (sqlite3.OperationalError, Exception) as err:
from archivebox.misc.db import sqlite_lock_error
if not sqlite_lock_error(err):
raise
attempt += 1
elapsed = time.monotonic() - started_at
retry_timeout = _sqlite_lock_retry_timeout()
retry_interval = _sqlite_lock_retry_interval()
_log_locked_database(query, params, attempt=attempt, elapsed=elapsed, retry_interval=retry_interval)
# If SQLite raised while Django is in autocommit mode, do not keep a
# partially-open sqlite transaction around while waiting. Explicit
# transaction.atomic() callers keep their normal transaction boundary.
_recover_non_atomic_connection(db_wrapper, query)
if _is_inside_atomic(db_wrapper):
raise
if retry_timeout and elapsed >= retry_timeout:
_abort_locked_database(query, params, elapsed=elapsed, db_wrapper=db_wrapper)
time.sleep(retry_interval)
class SQLiteCursorWrapper(DjangoSQLiteCursorWrapper):
def execute(self, query, params=None):
if params is None:
return _retry_locked_database(
lambda: super(SQLiteCursorWrapper, self).execute(query),
query,
db_wrapper=getattr(self, "db_wrapper", None),
)
param_names = list(params) if isinstance(params, Mapping) else None
converted_query = self.convert_query(query, param_names=param_names)
return _retry_locked_database(
lambda: super(DjangoSQLiteCursorWrapper, self).execute(converted_query, params),
converted_query,
params,
db_wrapper=getattr(self, "db_wrapper", None),
)
def executemany(self, query, param_list):
peekable, param_list = tee(iter(param_list))
if (params := next(peekable, None)) and isinstance(params, Mapping):
param_names = list(params)
else:
param_names = None
converted_query = self.convert_query(query, param_names=param_names)
param_list = tuple(param_list)
return _retry_locked_database(
lambda: super(DjangoSQLiteCursorWrapper, self).executemany(converted_query, param_list),
converted_query,
f"{len(param_list)} parameter sets",
db_wrapper=getattr(self, "db_wrapper", None),
)
class DatabaseWrapper(DjangoSQLiteDatabaseWrapper):
def create_cursor(self, name=None):
cursor = self.connection.cursor(factory=SQLiteCursorWrapper)
cursor.db_wrapper = self
return cursor
def _commit(self):
return _retry_locked_database(lambda: super(DatabaseWrapper, self)._commit(), "COMMIT", db_wrapper=self)
def _rollback(self):
return _retry_locked_database(lambda: super(DatabaseWrapper, self)._rollback(), "ROLLBACK", db_wrapper=self)