mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-06-22 19:40:39 -04:00
348 lines
14 KiB
Python
348 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from django.db import IntegrityError
|
|
from django.utils import timezone
|
|
from archivebox.config import CONSTANTS
|
|
from archivebox.config.common import rprint
|
|
|
|
RUNNER_ACTIVE_WORKER_TYPE = "worker_runner"
|
|
RUNNER_WAITING_WORKER_TYPE = "runner_waiting"
|
|
RUNNER_GATE_WORKER_TYPES = (RUNNER_ACTIVE_WORKER_TYPE, RUNNER_WAITING_WORKER_TYPE, "")
|
|
|
|
|
|
def runtime_stack_owner_types():
|
|
from archivebox.machine.models import Process
|
|
|
|
return (
|
|
Process.TypeChoices.SERVER,
|
|
Process.TypeChoices.ORCHESTRATOR,
|
|
)
|
|
|
|
|
|
def foreground_runner_owner_types():
|
|
from archivebox.machine.models import Process
|
|
|
|
return (
|
|
Process.TypeChoices.SERVER,
|
|
Process.TypeChoices.ADD,
|
|
Process.TypeChoices.UPDATE,
|
|
)
|
|
|
|
|
|
def current_command(process_type: str, *, data_dir: str | Path, url: str | None = None):
|
|
from archivebox.machine.models import Process
|
|
|
|
proc = Process.current()
|
|
proc.mark_running(process_type=process_type, pwd=str(data_dir), url=url, timeout=CONSTANTS.MAX_HOOK_RUNTIME_SECONDS)
|
|
return proc
|
|
|
|
|
|
def live_processes(*, process_type: str, data_dir: str | Path, url: str | None = None):
|
|
from archivebox.machine.models import Machine, Process
|
|
|
|
qs = Process.objects.filter(
|
|
machine=Machine.current(),
|
|
process_type=process_type,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pwd=str(data_dir),
|
|
)
|
|
if url is not None:
|
|
qs = qs.filter(url=url)
|
|
return [proc for proc in qs.order_by("-created_at", "-modified_at").iterator(chunk_size=50) if proc.is_running]
|
|
|
|
|
|
def newest_live_process(*, process_type: str, data_dir: str | Path, url: str | None = None):
|
|
processes = live_processes(process_type=process_type, data_dir=data_dir, url=url)
|
|
return processes[0] if processes else None
|
|
|
|
|
|
def command_is_newest(command, *, process_type: str, data_dir: str | Path, url: str | None = None) -> bool:
|
|
leader = newest_live_process(process_type=process_type, data_dir=data_dir, url=url)
|
|
return bool(leader and leader.id == command.id)
|
|
|
|
|
|
def runtime_stack_owner(*, data_dir: str | Path, exclude_id=None):
|
|
from archivebox.machine.models import Machine, Process
|
|
|
|
machine = Machine.current()
|
|
base_qs = Process.objects.filter(
|
|
machine=machine,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pwd=str(data_dir),
|
|
process_type__in=runtime_stack_owner_types(),
|
|
)
|
|
if exclude_id is not None:
|
|
base_qs = base_qs.exclude(id=exclude_id)
|
|
|
|
for qs in (
|
|
# Only server parents own HTTP runtime leadership. Foreground add/update
|
|
# commands can own runner/sonic components, but server startup must never
|
|
# wait behind them before binding Daphne.
|
|
base_qs.filter(process_type=Process.TypeChoices.SERVER),
|
|
# A foreground `archivebox run` process is allowed to own the runtime
|
|
# stack when no server/add parent is alive. A runner launched by
|
|
# supervisord is only a child worker; after its parent is killed it must
|
|
# not keep stealing leadership from the next foreground command.
|
|
base_qs.filter(process_type=Process.TypeChoices.ORCHESTRATOR).exclude(parent__process_type=Process.TypeChoices.SUPERVISORD),
|
|
):
|
|
for proc in qs.order_by("-created_at", "-modified_at").iterator(chunk_size=50):
|
|
if proc.is_running:
|
|
return proc
|
|
proc.mark_exited(exit_code=proc.exit_code if proc.exit_code is not None else 0)
|
|
return None
|
|
|
|
|
|
def command_owns_runtime_stack(command, *, data_dir: str | Path) -> bool:
|
|
owner = runtime_stack_owner(data_dir=data_dir)
|
|
return bool(owner and owner.id == command.id)
|
|
|
|
|
|
def foreground_runner_owner(*, data_dir: str | Path, exclude_id=None):
|
|
from archivebox.machine.models import Machine, Process
|
|
|
|
machine = Machine.current()
|
|
qs = Process.objects.filter(
|
|
machine=machine,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pwd=str(data_dir),
|
|
process_type__in=foreground_runner_owner_types(),
|
|
)
|
|
if exclude_id is not None:
|
|
qs = qs.exclude(id=exclude_id)
|
|
for proc in qs.order_by("-created_at", "-modified_at").iterator(chunk_size=50):
|
|
if proc.is_running:
|
|
return proc
|
|
proc.mark_exited(exit_code=proc.exit_code if proc.exit_code is not None else 0)
|
|
return None
|
|
|
|
|
|
def command_owns_foreground_runner(command, *, data_dir: str | Path) -> bool:
|
|
owner = foreground_runner_owner(data_dir=data_dir)
|
|
return bool(owner and owner.id == command.id)
|
|
|
|
|
|
def runtime_stack_component_label(*, owner=None, data_dir: str | Path) -> str:
|
|
try:
|
|
from archivebox.config.common import get_config
|
|
from archivebox.workers.supervisord_util import active_supervisord_runtime_components
|
|
|
|
components = active_supervisord_runtime_components(config=get_config())
|
|
except Exception:
|
|
components = []
|
|
|
|
names = list(components)
|
|
if not names and owner is not None:
|
|
from archivebox.machine.models import Process
|
|
|
|
if owner.process_type == Process.TypeChoices.SERVER:
|
|
names = ["orchestrator", "server"]
|
|
elif owner.process_type == Process.TypeChoices.ORCHESTRATOR:
|
|
names = ["orchestrator"]
|
|
|
|
return ", ".join(dict.fromkeys(names)) or "runtime stack"
|
|
|
|
|
|
def ensure_daemon_stack(*, reason: str = ""):
|
|
from archivebox.config.common import get_config
|
|
from archivebox.workers.supervisord_util import (
|
|
get_existing_supervisord_process,
|
|
get_or_create_supervisord_process,
|
|
get_sonic_supervisord_worker_from_plugin,
|
|
get_worker,
|
|
start_worker,
|
|
)
|
|
|
|
config = get_config()
|
|
sonic_worker = get_sonic_supervisord_worker_from_plugin(config)
|
|
if sonic_worker is None:
|
|
return None
|
|
|
|
from abx_plugins.plugins.search_backend_sonic.daemon import is_port_listening, prepare_sonic_daemon
|
|
|
|
sonic_event = prepare_sonic_daemon(config)
|
|
if is_port_listening(sonic_event.host, sonic_event.port):
|
|
return {
|
|
"name": sonic_event.worker_name,
|
|
"statename": "RUNNING",
|
|
"description": f"existing Sonic daemon at {sonic_event.url}",
|
|
}
|
|
|
|
supervisor = get_existing_supervisord_process() or get_or_create_supervisord_process(daemonize=False)
|
|
worker = get_worker(supervisor, sonic_worker["name"])
|
|
if isinstance(worker, dict) and worker.get("statename") in ("STARTING", "RUNNING"):
|
|
return worker
|
|
|
|
if reason:
|
|
rprint(f"[yellow][*] Starting daemon stack for {reason}...[/yellow]")
|
|
return start_worker(supervisor, sonic_worker)
|
|
|
|
|
|
def healthy_orchestrator(*, data_dir: str | Path):
|
|
from archivebox.machine.models import Machine, Process
|
|
from archivebox.workers.supervisord_util import get_existing_supervisord_process, get_worker
|
|
|
|
supervisor = get_existing_supervisord_process()
|
|
worker = get_worker(supervisor, "worker_runner") if supervisor else None
|
|
if isinstance(worker, dict) and worker.get("statename") in ("STARTING", "RUNNING"):
|
|
return worker
|
|
|
|
for proc in Process.objects.filter(
|
|
machine=Machine.current(),
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pwd=str(data_dir),
|
|
).order_by("-created_at"):
|
|
if proc.is_running:
|
|
return proc
|
|
return None
|
|
|
|
|
|
def _runner_sort_key(process):
|
|
return (process.started_at or process.created_at, process.created_at, str(process.id))
|
|
|
|
|
|
def live_runner_processes(*, data_dir: str | Path, exclude_id=None):
|
|
from archivebox.machine.models import Machine, Process
|
|
|
|
machine = Machine.current()
|
|
Process.cleanup_stale_running(machine=machine)
|
|
qs = Process.objects.filter(
|
|
machine=machine,
|
|
status=Process.StatusChoices.RUNNING,
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
worker_type__in=RUNNER_GATE_WORKER_TYPES,
|
|
pwd=str(data_dir),
|
|
)
|
|
if exclude_id is not None:
|
|
qs = qs.exclude(id=exclude_id)
|
|
return [process for process in qs.order_by("started_at", "created_at").iterator(chunk_size=20) if process.is_running]
|
|
|
|
|
|
def enter_single_runner_gate(command, *, data_dir: str | Path, graceful_timeout: float = 5.0) -> bool:
|
|
"""
|
|
Admit exactly one active runner for this DATA_DIR using Process rows.
|
|
|
|
The current process is a real OS process while it waits, so we keep its
|
|
Process row RUNNING but mark worker_type=runner_waiting. Only the process
|
|
that wins takeover is promoted to worker_type=worker_runner, which is
|
|
protected by a partial unique DB constraint. Older runners are terminated
|
|
and fully waited out before promotion, so the runner work loop never overlaps.
|
|
"""
|
|
from archivebox.machine.models import Process
|
|
|
|
command.mark_running(
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
worker_type=RUNNER_WAITING_WORKER_TYPE,
|
|
pwd=str(data_dir),
|
|
timeout=CONSTANTS.MAX_HOOK_RUNTIME_SECONDS,
|
|
)
|
|
while True:
|
|
runners = live_runner_processes(data_dir=data_dir)
|
|
if all(process.id != command.id for process in runners):
|
|
command.refresh_from_db()
|
|
command.mark_running(
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
worker_type=RUNNER_WAITING_WORKER_TYPE,
|
|
pwd=str(data_dir),
|
|
timeout=CONSTANTS.MAX_HOOK_RUNTIME_SECONDS,
|
|
)
|
|
runners = live_runner_processes(data_dir=data_dir)
|
|
|
|
newest = max(runners, key=_runner_sort_key)
|
|
if newest.id != command.id:
|
|
rprint(
|
|
f"[yellow][*] Newer ArchiveBox runner pid={newest.pid} is taking over; exiting this runner.[/yellow]",
|
|
file=sys.stderr,
|
|
)
|
|
return False
|
|
|
|
older_runners = [process for process in runners if process.id != command.id]
|
|
if older_runners:
|
|
for process in older_runners:
|
|
rprint(f"[yellow][*] Stopping older ArchiveBox runner process (pid={process.pid})...[/yellow]", file=sys.stderr)
|
|
process.kill_tree(graceful_timeout=graceful_timeout)
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
try:
|
|
command.mark_running(
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
worker_type=RUNNER_ACTIVE_WORKER_TYPE,
|
|
pwd=str(data_dir),
|
|
timeout=CONSTANTS.MAX_HOOK_RUNTIME_SECONDS,
|
|
)
|
|
return True
|
|
except IntegrityError:
|
|
# A hard-killed runner may leave the unique active-runner row behind.
|
|
# The next loop starts by proving each RUNNING Process row still maps
|
|
# to a live OS process, then marks dead rows EXITED before retrying.
|
|
command.refresh_from_db()
|
|
time.sleep(0.1)
|
|
|
|
|
|
def standby_until_leader_needed(command, *, process_type: str, data_dir: str | Path, url: str | None = None, interval: float = 2.0) -> None:
|
|
from archivebox.workers.supervisord_util import reap_foreground_supervisord_process
|
|
|
|
announced = False
|
|
while not command_is_newest(command, process_type=process_type, data_dir=data_dir, url=url):
|
|
reap_foreground_supervisord_process()
|
|
if not announced:
|
|
leader = newest_live_process(process_type=process_type, data_dir=data_dir, url=url)
|
|
leader_pid = leader.pid if leader else "unknown"
|
|
rprint(f"[yellow][*] Standing by; newer ArchiveBox process pid={leader_pid} is running the orchestrator and server.[/yellow]")
|
|
announced = True
|
|
time.sleep(interval)
|
|
command.modified_at = timezone.now()
|
|
command.save(update_fields=["modified_at"])
|
|
|
|
|
|
def standby_until_runtime_stack_needed(command, *, data_dir: str | Path, interval: float = 2.0) -> dict[str, object]:
|
|
from archivebox.workers.supervisord_util import reap_foreground_supervisord_process
|
|
|
|
announced = False
|
|
previous_owner_pid = None
|
|
while not command_owns_runtime_stack(command, data_dir=data_dir):
|
|
reap_foreground_supervisord_process()
|
|
if not announced:
|
|
owner = runtime_stack_owner(data_dir=data_dir)
|
|
owner_pid = owner.pid if owner else "unknown"
|
|
components = runtime_stack_component_label(owner=owner, data_dir=data_dir)
|
|
previous_owner_pid = owner_pid
|
|
rprint(
|
|
f"[yellow][*] A newer archivebox process took over the {components} "
|
|
f"(pid={owner_pid}). Work will continue there, and will resume here if that process exits and work still remains.[/yellow]",
|
|
file=sys.stderr,
|
|
)
|
|
announced = True
|
|
time.sleep(interval)
|
|
command.modified_at = timezone.now()
|
|
command.save(update_fields=["modified_at"])
|
|
return {"resumed": announced, "previous_owner_pid": previous_owner_pid}
|
|
|
|
|
|
def standby_until_foreground_runner_needed(command, *, data_dir: str | Path, interval: float = 2.0) -> dict[str, object]:
|
|
from archivebox.workers.supervisord_util import reap_foreground_supervisord_process
|
|
|
|
announced = False
|
|
previous_owner_pid = None
|
|
while not command_owns_foreground_runner(command, data_dir=data_dir):
|
|
reap_foreground_supervisord_process()
|
|
if not announced:
|
|
owner = foreground_runner_owner(data_dir=data_dir)
|
|
owner_pid = owner.pid if owner else "unknown"
|
|
previous_owner_pid = owner_pid
|
|
rprint(
|
|
f"[yellow][*] A newer archivebox process took over the orchestrator, sonic "
|
|
f"(pid={owner_pid}). Work will continue there, and will resume here if that process exits and work still remains.[/yellow]",
|
|
file=sys.stderr,
|
|
)
|
|
announced = True
|
|
time.sleep(interval)
|
|
command.modified_at = timezone.now()
|
|
command.save(update_fields=["modified_at"])
|
|
return {"resumed": announced, "previous_owner_pid": previous_owner_pid}
|