mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-06-22 03:20:45 -04:00
160 lines
6.1 KiB
Python
160 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
import time
|
|
from collections.abc import Mapping
|
|
from itertools import tee
|
|
import re
|
|
|
|
from django.db.backends.sqlite3.base import DatabaseWrapper as DjangoSQLiteDatabaseWrapper
|
|
from django.db.backends.sqlite3.base import SQLiteCursorWrapper as DjangoSQLiteCursorWrapper
|
|
|
|
|
|
def _sqlite_lock_retry_timeout() -> float:
|
|
from django.conf import settings
|
|
|
|
return settings.CONFIG.SQLITE_LOCK_RETRY_TIMEOUT
|
|
|
|
|
|
def _sqlite_lock_retry_interval() -> float:
|
|
from django.conf import settings
|
|
|
|
return settings.CONFIG.SQLITE_LOCK_RETRY_INTERVAL
|
|
|
|
|
|
def _format_sql(query: str, params=None) -> str:
|
|
compact = " ".join(str(query).split())
|
|
match = re.match(r'^(INSERT INTO|UPDATE|DELETE FROM|SELECT) "?([A-Za-z0-9_]+)"?', compact, flags=re.IGNORECASE)
|
|
if match:
|
|
compact = f"{match.group(1).upper()} {match.group(2)}"
|
|
if params is not None:
|
|
if isinstance(params, str):
|
|
params_summary = params
|
|
elif isinstance(params, (tuple, list)):
|
|
preview = ", ".join(repr(param)[:60] for param in params[:4])
|
|
params_summary = f"{len(params)} params: {preview}"
|
|
elif isinstance(params, Mapping):
|
|
preview = ", ".join(f"{key}={repr(value)[:60]}" for key, value in list(params.items())[:4])
|
|
params_summary = f"{len(params)} params: {preview}"
|
|
else:
|
|
params_summary = repr(params)[:120]
|
|
compact = f"{compact} ({params_summary})"
|
|
return compact[:260]
|
|
|
|
|
|
def _log_locked_database(query: str, params=None, *, attempt: int, elapsed: float, retry_interval: float) -> None:
|
|
from rich.console import Console
|
|
|
|
from archivebox.misc.db import log_sqlite_lock_holders
|
|
|
|
console = Console(stderr=True)
|
|
console.print(
|
|
f"[yellow][*] SQLite database is locked for {elapsed:.0f}s; retrying in {retry_interval:g}s... attempt={attempt}[/yellow]",
|
|
)
|
|
console.print(f"[yellow] Query: {_format_sql(query, params)}[/yellow]")
|
|
log_sqlite_lock_holders(console)
|
|
|
|
|
|
def _connection_in_transaction(connection) -> bool:
|
|
try:
|
|
return bool(connection and connection.in_transaction)
|
|
except (AttributeError, sqlite3.Error):
|
|
return False
|
|
|
|
|
|
def _recover_non_atomic_connection(db_wrapper, query: str) -> None:
|
|
if db_wrapper is None or getattr(db_wrapper, "in_atomic_block", False):
|
|
return
|
|
connection = getattr(db_wrapper, "connection", None)
|
|
if not _connection_in_transaction(connection):
|
|
return
|
|
try:
|
|
connection.rollback()
|
|
except sqlite3.Error:
|
|
return
|
|
|
|
|
|
def _is_inside_atomic(db_wrapper) -> bool:
|
|
return bool(db_wrapper is not None and getattr(db_wrapper, "in_atomic_block", False))
|
|
|
|
|
|
def _abort_locked_database(query: str, params=None, *, elapsed: float, db_wrapper=None) -> None:
|
|
_recover_non_atomic_connection(db_wrapper, query)
|
|
raise sqlite3.OperationalError(
|
|
f"SQLite database remained locked for {elapsed:.0f}s while running {_format_sql(query, params)}; "
|
|
"aborting instead of retrying indefinitely",
|
|
)
|
|
|
|
|
|
def _retry_locked_database(action, query: str, params=None, *, db_wrapper=None):
|
|
attempt = 0
|
|
started_at = time.monotonic()
|
|
while True:
|
|
try:
|
|
return action()
|
|
except (sqlite3.OperationalError, Exception) as err:
|
|
from archivebox.misc.db import sqlite_lock_error
|
|
|
|
if not sqlite_lock_error(err):
|
|
raise
|
|
attempt += 1
|
|
elapsed = time.monotonic() - started_at
|
|
retry_timeout = _sqlite_lock_retry_timeout()
|
|
retry_interval = _sqlite_lock_retry_interval()
|
|
_log_locked_database(query, params, attempt=attempt, elapsed=elapsed, retry_interval=retry_interval)
|
|
# If SQLite raised while Django is in autocommit mode, do not keep a
|
|
# partially-open sqlite transaction around while waiting. Explicit
|
|
# transaction.atomic() callers keep their normal transaction boundary.
|
|
_recover_non_atomic_connection(db_wrapper, query)
|
|
if _is_inside_atomic(db_wrapper):
|
|
raise
|
|
if retry_timeout and elapsed >= retry_timeout:
|
|
_abort_locked_database(query, params, elapsed=elapsed, db_wrapper=db_wrapper)
|
|
time.sleep(retry_interval)
|
|
|
|
|
|
class SQLiteCursorWrapper(DjangoSQLiteCursorWrapper):
|
|
def execute(self, query, params=None):
|
|
if params is None:
|
|
return _retry_locked_database(
|
|
lambda: super(SQLiteCursorWrapper, self).execute(query),
|
|
query,
|
|
db_wrapper=getattr(self, "db_wrapper", None),
|
|
)
|
|
param_names = list(params) if isinstance(params, Mapping) else None
|
|
converted_query = self.convert_query(query, param_names=param_names)
|
|
return _retry_locked_database(
|
|
lambda: super(DjangoSQLiteCursorWrapper, self).execute(converted_query, params),
|
|
converted_query,
|
|
params,
|
|
db_wrapper=getattr(self, "db_wrapper", None),
|
|
)
|
|
|
|
def executemany(self, query, param_list):
|
|
peekable, param_list = tee(iter(param_list))
|
|
if (params := next(peekable, None)) and isinstance(params, Mapping):
|
|
param_names = list(params)
|
|
else:
|
|
param_names = None
|
|
converted_query = self.convert_query(query, param_names=param_names)
|
|
param_list = tuple(param_list)
|
|
return _retry_locked_database(
|
|
lambda: super(DjangoSQLiteCursorWrapper, self).executemany(converted_query, param_list),
|
|
converted_query,
|
|
f"{len(param_list)} parameter sets",
|
|
db_wrapper=getattr(self, "db_wrapper", None),
|
|
)
|
|
|
|
|
|
class DatabaseWrapper(DjangoSQLiteDatabaseWrapper):
|
|
def create_cursor(self, name=None):
|
|
cursor = self.connection.cursor(factory=SQLiteCursorWrapper)
|
|
cursor.db_wrapper = self
|
|
return cursor
|
|
|
|
def _commit(self):
|
|
return _retry_locked_database(lambda: super(DatabaseWrapper, self)._commit(), "COMMIT", db_wrapper=self)
|
|
|
|
def _rollback(self):
|
|
return _retry_locked_database(lambda: super(DatabaseWrapper, self)._rollback(), "ROLLBACK", db_wrapper=self)
|