Files
2026-06-13 23:44:59 -07:00

962 lines
48 KiB
Python

__package__ = "archivebox.progressmonitor"
from functools import lru_cache
from pathlib import Path
from typing import Literal
from django.conf import settings
from django.db.models import CharField, Count, Q, Sum
from django.db.models.functions import Cast
from django.http import HttpResponse, JsonResponse
from django.utils import timezone
from abx_dl.events import PROCESS_EXIT_SKIPPED
from archivebox.config import CONSTANTS
from archivebox.config.common import get_config
from archivebox.core.routes_util import build_snapshot_url, build_web_url, get_api_base_url
from archivebox.core.permissions import can_view_snapshot, is_admin_user
from archivebox.plugins.discovery import discover_plugin_configs
from archivebox.misc.logging_util import printable_filesize
def progress_endpoint(scope: Literal["crawl", "snapshot"] | None = None, object_id: object | None = None) -> str:
"""Return the canonical same-origin progress endpoint for monitor embeds."""
if not scope or object_id is None:
return "/progress.json"
return f"/progress.json?{scope}_id={str(object_id).replace('-', '')}"
@lru_cache(maxsize=1)
def _live_progress_plugin_names() -> tuple[frozenset[str], frozenset[str]]:
plugin_configs = discover_plugin_configs()
download_plugin_names = frozenset(
plugin_name
for plugin_name, plugin_config in plugin_configs.items()
if plugin_config.get("output_mimetypes") and not plugin_name.startswith("search_backend_")
)
indexing_plugin_names = frozenset(plugin_name for plugin_name in plugin_configs if plugin_name.startswith("search_backend_"))
return download_plugin_names, indexing_plugin_names
def live_progress_view(request):
"""Simple JSON endpoint for live progress status - used by admin progress monitor."""
try:
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.machine.models import Process, Machine
snapshot_id_filter = (request.GET.get("snapshot_id") or "").strip().replace("-", "")
crawl_id_filter = (request.GET.get("crawl_id") or "").strip().replace("-", "")
is_admin = is_admin_user(request)
scoped_snapshot = None
if snapshot_id_filter:
import uuid as _uuid
try:
_uuid.UUID(snapshot_id_filter)
except (TypeError, ValueError):
return JsonResponse({"error": "Invalid snapshot_id"}, status=400)
scoped_snapshot = Snapshot.objects.filter(id=snapshot_id_filter).select_related("crawl").first()
if scoped_snapshot is None or not can_view_snapshot(request, scoped_snapshot):
return JsonResponse({"error": "Permission denied"}, status=403)
elif crawl_id_filter:
# Crawl-only scope still requires staff: there's no per-crawl ACL helper,
# and a crawl can mix snapshot permissions levels.
if not is_admin:
return JsonResponse({"error": "Permission denied"}, status=403)
else:
if not is_admin:
return JsonResponse({"error": "Permission denied"}, status=403)
request_config = request.archivebox_config
now = timezone.now()
crawl_scope = Crawl.objects.all()
snapshot_scope = Snapshot.objects.all()
archiveresult_scope = ArchiveResult.objects.all()
if is_admin and not request.user.is_superuser:
crawl_scope = crawl_scope.filter(created_by=request.user)
snapshot_scope = snapshot_scope.filter(crawl__created_by=request.user)
archiveresult_scope = archiveresult_scope.filter(snapshot__crawl__created_by=request.user)
if scoped_snapshot is not None:
snapshot_scope = Snapshot.objects.filter(id=scoped_snapshot.id)
crawl_scope = Crawl.objects.filter(id=scoped_snapshot.crawl_id)
archiveresult_scope = ArchiveResult.objects.filter(snapshot_id=scoped_snapshot.id)
elif crawl_id_filter:
snapshot_scope = snapshot_scope.filter(crawl_id=crawl_id_filter)
crawl_scope = crawl_scope.filter(id=crawl_id_filter)
archiveresult_scope = archiveresult_scope.filter(snapshot__crawl_id=crawl_id_filter)
def is_current_run_timestamp(event_ts, run_started_at) -> bool:
if run_started_at is None:
return True
if event_ts is None:
return False
return event_ts >= run_started_at
def archiveresult_matches_current_run(ar, run_started_at) -> bool:
if run_started_at is None:
return True
if ar.status in (
ArchiveResult.StatusChoices.QUEUED,
ArchiveResult.StatusChoices.STARTED,
ArchiveResult.StatusChoices.BACKOFF,
):
return True
event_ts = ar.end_ts or ar.start_ts or ar.modified_at or ar.created_at
return is_current_run_timestamp(event_ts, run_started_at)
def hook_details(hook_name: str, plugin: str = "setup") -> tuple[str, str, str, str]:
normalized_hook_name = Path(hook_name).name if hook_name else ""
if not normalized_hook_name:
return (plugin, plugin, "unknown", "")
phase = "unknown"
if normalized_hook_name == "InstallEvent":
phase = "install"
elif normalized_hook_name.startswith("on_CrawlSetup__"):
phase = "crawl"
elif normalized_hook_name.startswith("on_Snapshot__"):
phase = "snapshot"
label = normalized_hook_name
if "__" in normalized_hook_name:
label = normalized_hook_name.split("__", 1)[1]
label = label.rsplit(".", 1)[0]
if len(label) > 3 and label[:2].isdigit() and label[2] == "_":
label = label[3:]
label = label.replace("_", " ").strip() or plugin
return (plugin, label, phase, normalized_hook_name)
def process_label(cmd: list[str] | None) -> tuple[str, str, str, str]:
hook_path = ""
if isinstance(cmd, list) and cmd:
first = cmd[0]
if isinstance(first, str):
hook_path = first
if not hook_path:
return ("", "setup", "unknown", "")
return hook_details(Path(hook_path).name, plugin=Path(hook_path).parent.name or "setup")
def archiveresult_output_path(ar) -> str | None:
output_file_map = ar.output_files if isinstance(ar.output_files, dict) else {}
def is_root_relative(path: str) -> bool:
metadata = output_file_map.get(path) or {}
return bool(isinstance(metadata, dict) and metadata.get("root_relative"))
if ar.output_str:
raw_output = str(ar.output_str).strip()
if ar._looks_like_output_path(raw_output, ar.plugin):
output_path = Path(raw_output)
if output_path.is_absolute():
return None
if raw_output.startswith(f"{ar.plugin}/"):
candidates = [raw_output]
elif len(output_path.parts) == 1:
candidates = [f"{ar.plugin}/{raw_output}", raw_output]
else:
candidates = [raw_output]
if raw_output in output_file_map and is_root_relative(raw_output):
return raw_output
for relative_path in candidates:
plugin_relative = relative_path.removeprefix(f"{ar.plugin}/")
if relative_path in output_file_map:
return f"{ar.plugin}/{relative_path}" if not relative_path.startswith(f"{ar.plugin}/") else relative_path
if plugin_relative in output_file_map:
return f"{ar.plugin}/{plugin_relative}"
output_file_paths = list(output_file_map.keys())
if output_file_paths:
fallback_path = ArchiveResult._fallback_output_file_path(output_file_paths, ar.plugin, output_file_map)
if fallback_path:
if is_root_relative(fallback_path):
return fallback_path
return f"{ar.plugin}/{fallback_path}"
return None
def snapshot_output_url(snapshot, output_path: str) -> str:
return build_snapshot_url(str(snapshot["id"]), output_path, request=request, config=request_config)
def snapshot_archive_path(snapshot) -> str:
if snapshot["fs_version"] in ("0.7.0", "0.8.0"):
return f"{CONSTANTS.ARCHIVE_DIR_NAME}/{snapshot['timestamp']}"
crawl = crawls_by_id.get(str(snapshot["crawl_id"]))
username = "web"
if crawl is not None and crawl["created_by_id"]:
username = crawl["created_by__username"]
if username == "system":
username = "web"
date_base = snapshot["bookmarked_at"] or snapshot["created_at"]
date_str = date_base.strftime("%Y%m%d") if date_base else "unknown"
domain = Snapshot.extract_domain_from_url(snapshot["url"])
return f"{username}/{date_str}/{domain}/{snapshot['id']}"
def snapshot_view_url(snapshot, output_path: str = "") -> str:
anchor = f"#{output_path}" if output_path else ""
return build_web_url(
f"/{snapshot_archive_path(snapshot)}/index.html{anchor}",
request=request,
config=request_config,
)
def snapshot_display_url(url: str) -> str:
url = str(url or "")
return url if len(url) <= 96 else f"{url[:93]}..."
api_base = get_api_base_url(request=request, config=request_config) if scoped_snapshot is not None else ""
def screencast_frame_url(crawl_id: str, crawl_dir: Path) -> str:
frame_path = crawl_dir / "chrome_screencast" / "latest.jpg"
try:
frame_stat = frame_path.stat()
except OSError:
return ""
if frame_stat.st_size <= 0:
return ""
if now.timestamp() - frame_stat.st_mtime > 15:
return ""
rel = f"/api/v1/crawls/crawl/{crawl_id}/files/chrome_screencast/latest.jpg?v={frame_stat.st_mtime_ns}"
return f"{api_base}{rel}" if api_base else rel
machine_id = Machine.current().id
orchestrator_proc = (
Process.objects.filter(
machine_id=machine_id,
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
)
.only("id", "pid", "started_at", "machine_id", "process_type", "status")
.order_by("-started_at")
.first()
if machine_id is not None
else None
)
runner_worker = None
orchestrator_proc_running = bool(orchestrator_proc and orchestrator_proc.is_running)
if not orchestrator_proc_running:
try:
from archivebox.workers.supervisord_util import get_existing_supervisord_process, get_worker
supervisor = get_existing_supervisord_process(quiet=True)
runner_worker = get_worker(supervisor, "worker_runner") if supervisor else None
except Exception:
runner_worker = None
runner_worker_running = bool(runner_worker and runner_worker.get("statename") in ("STARTING", "RUNNING"))
runner_worker_pid = runner_worker.get("pid") if runner_worker else None
orchestrator_running = orchestrator_proc_running or runner_worker_running
orchestrator_pid = orchestrator_proc.pid if orchestrator_proc_running and orchestrator_proc else runner_worker_pid
# Get model counts by status
crawl_status_counts = Crawl.status_counts(
crawl_scope,
(Crawl.StatusChoices.QUEUED, Crawl.StatusChoices.STARTED, Crawl.StatusChoices.PAUSED),
)
crawls_queued = crawl_status_counts.get(Crawl.StatusChoices.QUEUED, 0)
crawls_active = crawl_status_counts.get(Crawl.StatusChoices.STARTED, 0)
# Get recent crawls (last 24 hours)
from datetime import timedelta
one_day_ago = now - timedelta(days=1)
paused_crawl_cutoff = now - timedelta(hours=12)
crawls_recent = crawl_scope.filter(created_at__gte=one_day_ago).count()
snapshot_status_counts = Snapshot.status_counts(
snapshot_scope,
Snapshot.OPEN_STATES,
)
snapshots_queued = snapshot_status_counts.get(Snapshot.StatusChoices.QUEUED, 0)
snapshots_active = snapshot_status_counts.get(Snapshot.StatusChoices.STARTED, 0)
download_plugin_names, indexing_plugin_names = _live_progress_plugin_names()
result_statuses = (
ArchiveResult.StatusChoices.QUEUED,
ArchiveResult.StatusChoices.STARTED,
)
archiveresult_status_counts = ArchiveResult.status_counts(archiveresult_scope, result_statuses)
download_scope = archiveresult_scope.filter(
plugin__in=download_plugin_names,
snapshot__status__in=Snapshot.RUNNABLE_STATES,
snapshot__crawl__status__in=Crawl.RUNNABLE_STATES,
)
indexing_scope = archiveresult_scope.filter(plugin__in=indexing_plugin_names)
download_status_counts = ArchiveResult.status_counts(download_scope, result_statuses)
indexing_status_counts = ArchiveResult.status_counts(indexing_scope, result_statuses)
archiveresults_queued = archiveresult_status_counts.get(ArchiveResult.StatusChoices.QUEUED, 0)
archiveresults_active = archiveresult_status_counts.get(ArchiveResult.StatusChoices.STARTED, 0)
downloads_queued = download_status_counts.get(ArchiveResult.StatusChoices.QUEUED, 0)
downloads_active = download_status_counts.get(ArchiveResult.StatusChoices.STARTED, 0)
indexing_queued = indexing_status_counts.get(ArchiveResult.StatusChoices.QUEUED, 0)
indexing_active = indexing_status_counts.get(ArchiveResult.StatusChoices.STARTED, 0)
# Build hierarchical active crawls with nested snapshots and archive results
max_active_crawls = 10
max_queued_crawls = 10
max_started_snapshots_per_crawl = 50
max_queued_snapshots_per_crawl = 50
active_crawl_fields = (
"id",
"created_at",
"created_by_id",
"modified_at",
"urls",
"config",
"max_depth",
"tags_str",
"persona_id",
"status",
"retry_at",
"label",
"created_by__id",
"created_by__username",
)
started_crawls = list(
crawl_scope.filter(status=Crawl.StatusChoices.STARTED)
.values(*active_crawl_fields)
.order_by("-modified_at")[:max_active_crawls],
)
paused_crawls = list(
crawl_scope.filter(
Q(status=Crawl.StatusChoices.PAUSED, created_at__gte=paused_crawl_cutoff)
| Q(
status=Crawl.StatusChoices.PAUSED,
snapshot_set__status__in=Snapshot.RUNNABLE_STATES,
snapshot_set__retry_at__lte=now,
)
| Q(
status=Crawl.StatusChoices.PAUSED,
snapshot_set__archiveresult__status=ArchiveResult.StatusChoices.QUEUED,
),
)
.values(*active_crawl_fields)
.distinct()
.order_by("-modified_at")[:max_active_crawls],
)
queued_crawls = list(
crawl_scope.filter(status=Crawl.StatusChoices.QUEUED).values(*active_crawl_fields).order_by("-modified_at")[:max_queued_crawls],
)
queued_crawls_hidden = max(crawls_queued - len(queued_crawls), 0)
active_crawls_list = started_crawls + paused_crawls + queued_crawls
for crawl in active_crawls_list:
crawl["id"] = str(crawl["id"])
if crawl["persona_id"]:
crawl["persona_id"] = str(crawl["persona_id"])
persona_details_by_id: dict[str, dict[str, str]] = {}
persona_details_by_name: dict[str, dict[str, str]] = {}
persona_objects_by_id = {}
persona_objects_by_name = {}
persona_ids = {crawl["persona_id"] for crawl in active_crawls_list if crawl["persona_id"]}
persona_names = {"Default"} if any(not crawl["persona_id"] for crawl in active_crawls_list) else set()
if persona_ids or persona_names:
from archivebox.personas.models import Persona
for persona in Persona.objects.filter(Q(id__in=persona_ids) | Q(name__in=persona_names)).only("id", "name", "config"):
persona_details = {
"name": persona.name,
"admin_url": f"/admin/personas/persona/{persona.pk}/change/",
}
persona_details_by_id[str(persona.id)] = persona_details
persona_details_by_name[persona.name] = persona_details
persona_objects_by_id[str(persona.id)] = persona
persona_objects_by_name[persona.name] = persona
active_crawl_ids = [crawl["id"] for crawl in active_crawls_list]
active_crawl_objects = {}
if active_crawl_ids:
for crawl_obj in Crawl.objects.filter(id__in=active_crawl_ids).select_related("created_by", "persona"):
crawl_obj._runtime_config = request_config
active_crawl_objects[str(crawl_obj.id)] = crawl_obj
snapshot_counts_by_crawl: dict[str, dict[str, int]] = {str(crawl_id): {} for crawl_id in active_crawl_ids}
cancelled_snapshot_counts_by_crawl: dict[str, int] = {str(crawl_id): 0 for crawl_id in active_crawl_ids}
crawl_output_sizes_by_crawl: dict[str, int] = {str(crawl_id): 0 for crawl_id in active_crawl_ids}
queued_snapshot_overflow_by_crawl: dict[str, int] = {str(crawl_id): 0 for crawl_id in active_crawl_ids}
active_snapshot_scope = snapshot_scope.filter(crawl_id__in=active_crawl_ids)
if active_crawl_ids:
for row in active_snapshot_scope.values("crawl_id", "status").annotate(count=Count("id")):
snapshot_counts_by_crawl.setdefault(str(row["crawl_id"]), {})[row["status"]] = row["count"]
for row in (
active_snapshot_scope.filter(status=Snapshot.StatusChoices.SEALED, downloaded_at__isnull=True)
.values("crawl_id")
.annotate(count=Count("id"))
):
cancelled_snapshot_counts_by_crawl[str(row["crawl_id"])] = row["count"]
for row in (
active_snapshot_scope.filter(
status=Snapshot.StatusChoices.SEALED,
)
.values("crawl_id")
.annotate(size=Sum("output_size"))
):
crawl_output_sizes_by_crawl[str(row["crawl_id"])] = int(row["size"] or 0)
crawl_process_pids: dict[str, int] = {}
snapshot_process_pids: dict[str, int] = {}
process_records_by_crawl: dict[str, list[tuple[dict[str, object], object | None]]] = {}
process_records_by_snapshot: dict[str, list[tuple[dict[str, object], object | None]]] = {}
seen_process_records: set[str] = set()
crawls_by_id = {str(crawl["id"]): crawl for crawl in active_crawls_list}
started_snapshot_fields = (
"id_str",
"created_at",
"modified_at",
"url",
"timestamp",
"bookmarked_at",
"crawl_id_str",
"title",
"downloaded_at",
"fs_version",
"status",
)
queued_snapshot_fields = (
"id_str",
"url",
"crawl_id_str",
"title",
"status",
)
snapshots = []
for crawl_id in active_crawl_ids:
crawl_snapshot_scope = active_snapshot_scope.filter(crawl_id=crawl_id)
snapshots.extend(
crawl_snapshot_scope.filter(status=Snapshot.StatusChoices.STARTED)
.annotate(id_str=Cast("id", CharField()), crawl_id_str=Cast("crawl_id", CharField()))
.values(*started_snapshot_fields)
.order_by("-modified_at")[:max_started_snapshots_per_crawl],
)
queued_snapshots = list(
crawl_snapshot_scope.filter(status=Snapshot.StatusChoices.QUEUED)
.annotate(id_str=Cast("id", CharField()), crawl_id_str=Cast("crawl_id", CharField()))
.values(
*queued_snapshot_fields,
)
.order_by("modified_at")[:max_queued_snapshots_per_crawl],
)
queued_snapshot_overflow_by_crawl[str(crawl_id)] = max(
snapshot_counts_by_crawl.get(str(crawl_id), {}).get(Snapshot.StatusChoices.QUEUED, 0) - len(queued_snapshots),
0,
)
snapshots.extend(queued_snapshots)
for snapshot in snapshots:
# Process.pwd points at Snapshot.output_dir, which uses CompactUUID
# hex path components. Keep progress IDs compact too so process rows
# can be matched without carrying dashed/undashed variants.
snapshot["id"] = str(snapshot.pop("id_str")).replace("-", "")
snapshot["crawl_id"] = str(snapshot.pop("crawl_id_str")).replace("-", "")
snapshots_by_id = {str(snapshot["id"]): snapshot for snapshot in snapshots}
displayed_snapshots_by_crawl: dict[str, list[Snapshot]] = {str(crawl_id): [] for crawl_id in active_crawl_ids}
for snapshot in snapshots:
crawl_snapshots = displayed_snapshots_by_crawl.setdefault(str(snapshot["crawl_id"]), [])
crawl_snapshots.append(snapshot)
displayed_snapshot_ids = [
snapshot["id"] for crawl_snapshots in displayed_snapshots_by_crawl.values() for snapshot in crawl_snapshots
]
detailed_snapshot_ids = [snapshot["id"] for snapshot in snapshots if snapshot["status"] != Snapshot.StatusChoices.QUEUED]
process_value_fields = ("id", "process_type", "status", "pwd", "cmd", "pid", "exit_code", "started_at", "modified_at")
if active_crawl_ids or displayed_snapshot_ids:
process_scope = Process.objects.filter(
machine_id=machine_id,
process_type__in=[
Process.TypeChoices.HOOK,
Process.TypeChoices.BINARY,
],
)
running_processes = process_scope.filter(status=Process.StatusChoices.RUNNING).values(*process_value_fields)
recent_processes = (
process_scope.filter(modified_at__gte=now - timedelta(minutes=10)).values(*process_value_fields).order_by("-modified_at")
)
else:
running_processes = Process.objects.none()
recent_processes = Process.objects.none()
archiveresults_by_snapshot: dict[str, list[ArchiveResult]] = {str(snapshot_id): [] for snapshot_id in detailed_snapshot_ids}
if detailed_snapshot_ids:
displayed_archiveresults = (
archiveresult_scope.filter(snapshot_id__in=detailed_snapshot_ids)
.select_related("process")
.only(
"id",
"snapshot_id",
"plugin",
"hook_name",
"status",
"output_str",
"output_files",
"output_size",
"start_ts",
"end_ts",
"created_at",
"modified_at",
"process_id",
"process__id",
"process__pid",
"process__started_at",
"process__timeout",
)
.order_by("snapshot_id", "start_ts", "created_at")
)
for archiveresult in displayed_archiveresults:
archiveresults_by_snapshot.setdefault(str(archiveresult.snapshot_id), []).append(archiveresult)
def find_snapshot_for_process(proc_pwd: Path) -> Snapshot | None:
for path_part in reversed(proc_pwd.parts):
snapshot = snapshots_by_id.get(path_part)
if snapshot:
return snapshot
return None
def find_crawl_for_process(proc_pwd: Path) -> Crawl | None:
for path_part in reversed(proc_pwd.parts):
crawl = crawls_by_id.get(path_part)
if crawl:
return crawl
return None
running_worker_ids: set[str] = set()
for proc in running_processes:
if not proc["pwd"]:
continue
proc_pwd = Path(proc["pwd"])
matched_snapshot = find_snapshot_for_process(proc_pwd)
matched_crawl = (
crawls_by_id.get(str(matched_snapshot["crawl_id"])) if matched_snapshot is not None else find_crawl_for_process(proc_pwd)
)
if matched_snapshot is None:
if matched_crawl is None:
continue
crawl_id = str(matched_crawl["id"])
snapshot_id = ""
else:
crawl_id = str(matched_snapshot["crawl_id"])
snapshot_id = str(matched_snapshot["id"])
running_worker_ids.add(str(proc["id"]))
_plugin, _label, phase, _hook_name = process_label(proc["cmd"])
if crawl_id and proc["pid"]:
crawl_process_pids.setdefault(crawl_id, proc["pid"])
if phase == "snapshot" and snapshot_id and proc["pid"]:
snapshot_process_pids.setdefault(snapshot_id, proc["pid"])
for proc in recent_processes:
if not proc["pwd"]:
continue
proc_pwd = Path(proc["pwd"])
matched_snapshot = find_snapshot_for_process(proc_pwd)
matched_crawl = (
crawls_by_id.get(str(matched_snapshot["crawl_id"])) if matched_snapshot is not None else find_crawl_for_process(proc_pwd)
)
if matched_snapshot is None and matched_crawl is None:
continue
crawl_id = str(matched_snapshot["crawl_id"] if matched_snapshot is not None else matched_crawl["id"])
snapshot_id = str(matched_snapshot["id"]) if matched_snapshot is not None else ""
plugin, label, phase, hook_name = process_label(proc["cmd"])
record_scope = str(snapshot_id) if phase == "snapshot" and snapshot_id else str(crawl_id)
proc_key = f"{record_scope}:{plugin}:{label}:{proc['status']}:{proc['exit_code']}"
if proc_key in seen_process_records:
continue
seen_process_records.add(proc_key)
status = (
"started"
if proc["status"] == Process.StatusChoices.RUNNING
else (
"skipped"
if proc["exit_code"] == PROCESS_EXIT_SKIPPED or (phase == "binary" and proc["exit_code"] not in (None, 0))
else ("failed" if proc["exit_code"] not in (None, 0) else "succeeded")
)
)
payload: dict[str, object] = {
"id": str(proc["id"]),
"plugin": plugin,
"label": label,
"hook_name": hook_name,
"status": status,
"phase": phase,
"source": "process",
"process_id": str(proc["id"]),
}
if status == "started" and proc["pid"]:
payload["pid"] = proc["pid"]
proc_started_at = proc["started_at"] or proc["modified_at"]
if phase == "snapshot" and snapshot_id:
process_records_by_snapshot.setdefault(snapshot_id, []).append((payload, proc_started_at))
elif crawl_id:
process_records_by_crawl.setdefault(crawl_id, []).append((payload, proc_started_at))
active_crawls = []
total_workers = len(running_worker_ids)
for crawl in active_crawls_list:
crawl_id = str(crawl["id"])
crawl_snapshot_counts = snapshot_counts_by_crawl.get(crawl_id, {})
total_snapshots = sum(crawl_snapshot_counts.values())
completed_snapshots = crawl_snapshot_counts.get(Snapshot.StatusChoices.SEALED, 0)
started_snapshots = crawl_snapshot_counts.get(Snapshot.StatusChoices.STARTED, 0)
pending_snapshots = crawl_snapshot_counts.get(Snapshot.StatusChoices.QUEUED, 0)
cancelled_snapshots = cancelled_snapshot_counts_by_crawl.get(crawl_id, 0)
# Count URLs in the crawl (for when snapshots haven't been created yet)
urls_count = 0
if crawl["urls"]:
urls_count = len([u for u in crawl["urls"].split("\n") if u.strip() and not u.startswith("#")])
# Calculate crawl progress
crawl_progress = int((completed_snapshots / total_snapshots) * 100) if total_snapshots > 0 else 0
crawl_run_started_at = crawl["created_at"]
crawl_setup_plugins = [
payload
for payload, proc_started_at in process_records_by_crawl.get(crawl_id, [])
if is_current_run_timestamp(proc_started_at, crawl_run_started_at)
]
crawl_setup_total = len(crawl_setup_plugins)
crawl_setup_completed = sum(1 for item in crawl_setup_plugins if item.get("status") == "succeeded")
crawl_setup_failed = sum(1 for item in crawl_setup_plugins if item.get("status") == "failed")
crawl_setup_pending = sum(1 for item in crawl_setup_plugins if item.get("status") == "queued")
crawl_screencast_url = screencast_frame_url(crawl_id, active_crawl_objects[crawl_id].output_dir)
crawl_screencast_link = f"/admin/crawls/crawl/{crawl_id.replace('-', '')}/change/" if crawl_screencast_url else ""
# Get active snapshots for this crawl (already prefetched)
active_snapshots_for_crawl = []
for snapshot in displayed_snapshots_by_crawl.get(crawl_id, []):
snapshot_run_started_at = snapshot.get("downloaded_at") or snapshot.get("created_at")
# Get archive results only for displayed active snapshots. Large crawls can
# contain thousands of sealed snapshots, and prefetching all their results
# makes the progress endpoint compete with the runner.
snapshot_results = [
ar
for ar in archiveresults_by_snapshot.get(str(snapshot["id"]), [])
if archiveresult_matches_current_run(ar, snapshot_run_started_at)
]
if snapshot["status"] == Snapshot.StatusChoices.QUEUED:
snapshot_results = []
plugin_progress_values: list[int] = []
all_plugins: list[dict[str, object]] = []
seen_plugin_keys: set[str] = set()
snapshot_title = (
str(snapshot["title"] or "")
if snapshot["status"] == Snapshot.StatusChoices.QUEUED
else Snapshot._normalize_title_candidate(snapshot["title"], snapshot_url=snapshot["url"])
)
snapshot_favicon_url = ""
snapshot_preview_url = ""
snapshot_preview_link = ""
snapshot_screencast_url = ""
snapshot_screencast_link = ""
snapshot_fallback_urls: list[str] = []
result_by_plugin = {result.plugin: result for result in snapshot_results}
title_result = result_by_plugin.get("title")
if not snapshot_title and title_result is not None and title_result.status == ArchiveResult.StatusChoices.SUCCEEDED:
snapshot_title = Snapshot._normalize_title_candidate(title_result.output_str, snapshot_url=snapshot["url"])
favicon_result = result_by_plugin.get("favicon")
if favicon_result is not None and favicon_result.status == ArchiveResult.StatusChoices.SUCCEEDED:
favicon_path = archiveresult_output_path(favicon_result) or "favicon/favicon.ico"
snapshot_favicon_url = snapshot_output_url(snapshot, favicon_path)
screenshot_result = result_by_plugin.get("screenshot")
if screenshot_result is not None and screenshot_result.status == ArchiveResult.StatusChoices.SUCCEEDED:
snapshot_preview_link = snapshot_view_url(snapshot)
screenshot_path = archiveresult_output_path(screenshot_result) or "screenshot/screenshot.png"
snapshot_preview_url = snapshot_output_url(snapshot, screenshot_path)
snapshot_preview_link = snapshot_view_url(snapshot, screenshot_path)
if snapshot_favicon_url:
snapshot_fallback_urls.append(snapshot_favicon_url)
elif snapshot_favicon_url:
snapshot_preview_url = snapshot_favicon_url
if snapshot["status"] == Snapshot.StatusChoices.STARTED:
snapshot_screencast_url = screencast_frame_url(crawl_id, active_crawl_objects[crawl_id].output_dir)
snapshot_screencast_link = snapshot_view_url(snapshot) if snapshot_screencast_url else ""
def plugin_sort_key(ar):
status_order = {
ArchiveResult.StatusChoices.STARTED: 0,
ArchiveResult.StatusChoices.QUEUED: 1,
ArchiveResult.StatusChoices.SUCCEEDED: 2,
ArchiveResult.StatusChoices.NORESULTS: 3,
ArchiveResult.StatusChoices.FAILED: 4,
}
return (status_order.get(ar.status, 5), ar.plugin, ar.hook_name or "")
for ar in sorted(snapshot_results, key=plugin_sort_key):
status = ar.status
process = ar.process_record
progress_value = 0
if status in (
ArchiveResult.StatusChoices.SUCCEEDED,
ArchiveResult.StatusChoices.FAILED,
ArchiveResult.StatusChoices.SKIPPED,
ArchiveResult.StatusChoices.NORESULTS,
):
progress_value = 100
elif status == ArchiveResult.StatusChoices.STARTED:
started_at = ar.start_ts or (process.started_at if process else None)
timeout = process.timeout if process else 120
if started_at and timeout:
elapsed = max(0.0, (now - started_at).total_seconds())
progress_value = int(min(99, max(1, (elapsed / float(timeout)) * 100)))
else:
progress_value = 1
else:
progress_value = 0
plugin_progress_values.append(progress_value)
plugin, label, phase, hook_name = hook_details(ar.hook_name or ar.plugin, plugin=ar.plugin)
plugin_payload = {
"id": str(ar.id),
"plugin": ar.plugin,
"label": label,
"hook_name": hook_name,
"phase": phase,
"status": status,
"process_id": str(process.id) if process else None,
"admin_url": f"/admin/core/archiveresult/{ar.id}/change/",
}
output_path = archiveresult_output_path(ar)
if output_path:
plugin_payload["output_path"] = output_path
plugin_payload["output_url"] = snapshot_view_url(snapshot, output_path)
if status == ArchiveResult.StatusChoices.STARTED and process:
plugin_payload["pid"] = process.pid
if status == ArchiveResult.StatusChoices.STARTED:
plugin_payload["progress"] = progress_value
plugin_payload["timeout"] = process.timeout if process else 120
plugin_payload["source"] = "archiveresult"
all_plugins.append(plugin_payload)
seen_plugin_keys.add(str(process.id) if process else f"{ar.plugin}:{hook_name}")
for proc_payload, proc_started_at in process_records_by_snapshot.get(str(snapshot["id"]), []):
if not is_current_run_timestamp(proc_started_at, snapshot_run_started_at):
continue
proc_key = str(proc_payload.get("process_id") or f"{proc_payload.get('plugin')}:{proc_payload.get('hook_name')}")
if proc_key in seen_plugin_keys:
continue
seen_plugin_keys.add(proc_key)
all_plugins.append(proc_payload)
proc_status = proc_payload.get("status")
if proc_status in ("succeeded", "failed", "skipped"):
plugin_progress_values.append(100)
elif proc_status == "started":
plugin_progress_values.append(1)
else:
plugin_progress_values.append(0)
total_plugins = len(all_plugins)
completed_plugins = sum(1 for item in all_plugins if item.get("status") == "succeeded")
failed_plugins = sum(1 for item in all_plugins if item.get("status") == "failed")
pending_plugins = sum(1 for item in all_plugins if item.get("status") == "queued")
snapshot_progress = int(sum(plugin_progress_values) / len(plugin_progress_values)) if plugin_progress_values else 0
worker_state = "running" if snapshot_process_pids.get(str(snapshot["id"])) else "waiting"
if (
snapshot["status"] == Snapshot.StatusChoices.STARTED
and worker_state == "waiting"
and not all_plugins
and snapshot["modified_at"]
and (now - snapshot["modified_at"]).total_seconds() > 30
):
worker_state = "waiting" if orchestrator_running else "crashed"
if snapshot["status"] == Snapshot.StatusChoices.QUEUED and not snapshot_process_pids.get(str(snapshot["id"])):
compact_snapshot = [
str(snapshot["id"]),
snapshot_display_url(snapshot["url"]),
]
if snapshot_title:
compact_snapshot.append(snapshot_title)
active_snapshots_for_crawl.append(compact_snapshot)
continue
snapshot_payload = {
"id": str(snapshot["id"]),
"url": snapshot_display_url(snapshot["url"]),
"title": snapshot_title,
"status": snapshot["status"],
"worker_state": worker_state,
}
if snapshot["status"] != Snapshot.StatusChoices.QUEUED or all_plugins or snapshot_process_pids.get(str(snapshot["id"])):
snapshot_payload.update(
{
"view_url": snapshot_view_url(snapshot),
"started": (snapshot["downloaded_at"] or snapshot["created_at"]).isoformat()
if (snapshot["downloaded_at"] or snapshot["created_at"])
else None,
"progress": snapshot_progress,
"total_plugins": total_plugins,
"completed_plugins": completed_plugins,
"failed_plugins": failed_plugins,
"pending_plugins": pending_plugins,
"all_plugins": all_plugins,
},
)
if snapshot_favicon_url:
snapshot_payload["favicon_url"] = snapshot_favicon_url
if snapshot_preview_url:
snapshot_payload["preview_url"] = snapshot_preview_url
snapshot_payload["preview_link"] = snapshot_preview_link
if snapshot_screencast_url:
snapshot_payload["screencast_url"] = snapshot_screencast_url
snapshot_payload["screencast_link"] = snapshot_screencast_link
if snapshot_fallback_urls:
snapshot_payload["preview_fallbacks"] = snapshot_fallback_urls
if snapshot_process_pids.get(str(snapshot["id"])):
snapshot_payload["worker_pid"] = snapshot_process_pids[str(snapshot["id"])]
active_snapshots_for_crawl.append(snapshot_payload)
# Check if crawl can start (for debugging stuck crawls)
can_start = bool(crawl["urls"])
urls_preview = crawl["urls"][:60] if crawl["urls"] else None
crawl_tags = [tag.strip() for tag in (crawl["tags_str"] or "").replace("\n", ",").split(",") if tag.strip()]
persona_details = persona_details_by_id.get(str(crawl["persona_id"])) if crawl["persona_id"] else None
persona_name = persona_details["name"] if persona_details else "Default"
persona_details = persona_details or persona_details_by_name.get(persona_name)
crawl_output_size = crawl_output_sizes_by_crawl.get(crawl_id, 0)
avg_snapshot_size = int(crawl_output_size / completed_snapshots) if completed_snapshots else 0
crawl_obj = active_crawl_objects[crawl_id]
effective_crawl_config = get_config(crawl=crawl_obj, resolve_plugins=False)
max_urls = int(effective_crawl_config.CRAWL_MAX_URLS or 0)
crawl_max_size = int(effective_crawl_config.CRAWL_MAX_SIZE or 0)
crawl_timeout = int(effective_crawl_config.CRAWL_TIMEOUT or 0)
snapshot_max_size = int(effective_crawl_config.SNAPSHOT_MAX_SIZE or 0)
# Check if retry_at is in the future (would prevent worker from claiming)
retry_at_future = crawl["retry_at"] > now if crawl["retry_at"] else False
is_paused = crawl_obj.is_paused
seconds_until_retry = (
0 if is_paused else int((crawl["retry_at"] - now).total_seconds()) if crawl["retry_at"] and retry_at_future else 0
)
crawl_worker_state = (
"running"
if crawl_process_pids.get(crawl_id)
or any(isinstance(snapshot, dict) and snapshot.get("worker_pid") for snapshot in active_snapshots_for_crawl)
else "waiting"
)
if is_paused:
crawl_worker_state = "paused"
elif (
crawl["status"] == Crawl.StatusChoices.STARTED
and crawl_worker_state == "waiting"
and (started_snapshots or pending_snapshots)
):
crawl_worker_state = "waiting" if orchestrator_running else "crashed"
active_crawls.append(
{
"id": crawl_id,
"label": (next((line.strip() for line in (crawl["urls"] or "").splitlines() if line.strip()), "") or crawl_id)[:60],
"status": crawl["status"],
"is_paused": is_paused,
"started": crawl["created_at"].isoformat() if crawl["created_at"] else None,
"progress": crawl_progress,
"created_by": crawl["created_by__username"],
"persona": persona_name,
"persona_admin_url": persona_details["admin_url"] if persona_details else None,
"max_depth": crawl["max_depth"],
"max_urls": max_urls,
"max_crawl_size": crawl_max_size,
"crawl_timeout": crawl_timeout,
"max_snapshot_size": snapshot_max_size,
"max_crawl_size_display": printable_filesize(crawl_max_size) if crawl_max_size else "unlimited",
"crawl_timeout_display": f"{crawl_timeout}s" if crawl_timeout else "unlimited",
"max_snapshot_size_display": printable_filesize(snapshot_max_size) if snapshot_max_size else "unlimited",
"crawl_output_size": crawl_output_size,
"avg_snapshot_size": avg_snapshot_size,
"crawl_output_size_display": printable_filesize(crawl_output_size) if crawl_output_size else "0 B",
"avg_snapshot_size_display": printable_filesize(avg_snapshot_size) if avg_snapshot_size else "0 B",
"tags": crawl_tags,
"urls_count": urls_count,
"total_snapshots": total_snapshots,
"completed_snapshots": completed_snapshots,
"started_snapshots": started_snapshots,
"failed_snapshots": 0,
"pending_snapshots": pending_snapshots,
"cancelled_snapshots": cancelled_snapshots,
"setup_plugins": crawl_setup_plugins,
"setup_total_plugins": crawl_setup_total,
"setup_completed_plugins": crawl_setup_completed,
"setup_failed_plugins": crawl_setup_failed,
"setup_pending_plugins": crawl_setup_pending,
"screencast_url": crawl_screencast_url,
"screencast_link": crawl_screencast_link,
"active_snapshots": active_snapshots_for_crawl,
"queued_snapshots_hidden": queued_snapshot_overflow_by_crawl.get(crawl_id, 0),
"can_start": can_start,
"urls_preview": urls_preview,
"retry_at_future": retry_at_future,
"seconds_until_retry": seconds_until_retry,
"worker_pid": crawl_process_pids.get(crawl_id),
"worker_state": crawl_worker_state,
},
)
payload = {
"is_admin": is_admin,
"scope": {
"snapshot_id": str(scoped_snapshot.id) if scoped_snapshot is not None else "",
"crawl_id": crawl_id_filter,
},
"orchestrator_running": orchestrator_running,
"orchestrator_pid": orchestrator_pid,
"total_workers": total_workers,
"crawls_active": crawls_active,
"crawls_queued": crawls_queued,
"crawls_recent": crawls_recent,
"snapshots_active": snapshots_active,
"snapshots_queued": snapshots_queued,
"archiveresults_active": archiveresults_active,
"archiveresults_queued": archiveresults_queued,
"downloads_active": downloads_active,
"downloads_queued": downloads_queued,
"indexing_active": indexing_active,
"indexing_queued": indexing_queued,
"active_crawls": active_crawls,
"queued_crawls_hidden": queued_crawls_hidden,
"server_time": timezone.now().isoformat(),
}
try:
import ujson
return HttpResponse(ujson.dumps(payload), content_type="application/json")
except ImportError:
return JsonResponse(payload)
except Exception as e:
error_payload = {
"error": str(e),
"orchestrator_running": False,
"total_workers": 0,
"crawls_active": 0,
"crawls_queued": 0,
"crawls_recent": 0,
"snapshots_active": 0,
"snapshots_queued": 0,
"archiveresults_active": 0,
"archiveresults_queued": 0,
"downloads_active": 0,
"downloads_queued": 0,
"indexing_active": 0,
"indexing_queued": 0,
"active_crawls": [],
"server_time": timezone.now().isoformat(),
}
if settings.DEBUG:
import traceback
error_payload["traceback"] = traceback.format_exc()
return JsonResponse(error_payload, status=500)