Files
2026-06-14 11:44:38 -07:00

1154 lines
42 KiB
Python

import asyncio
import sys
from pathlib import Path
import pytest
from asgiref.sync import sync_to_async
pytestmark = pytest.mark.django_db
@pytest.mark.django_db(transaction=True)
def test_cancelled_crawl_projection_emits_abort_event_from_runner_bus():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.services.runner import CrawlRunner
from abx_dl.events import CrawlAbortEvent, CrawlEvent
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
)
snapshot = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.STARTED,
)
runner = CrawlRunner(crawl)
async def run() -> CrawlAbortEvent | None:
abort_event_holder: dict[str, CrawlAbortEvent | None] = {"event": None}
async def on_CrawlEvent(event: CrawlEvent) -> None:
watcher = asyncio.create_task(runner.watch_for_cancelled_crawl(event, poll_interval=0.01))
await asyncio.sleep(0.02)
await sync_to_async(Crawl.objects.filter(id=crawl.id).update, thread_sensitive=True)(
status=Crawl.StatusChoices.SEALED,
retry_at=None,
)
abort_event = await runner.bus.find(CrawlAbortEvent, child_of=event, past=True, future=1.0)
abort_event_holder["event"] = abort_event if isinstance(abort_event, CrawlAbortEvent) else None
await watcher
runner.bus.on(CrawlEvent, on_CrawlEvent)
await runner.bus.emit(
CrawlEvent(
url=snapshot.url,
snapshot_id=str(snapshot.id),
output_dir=str(crawl.output_dir),
),
).now()
await runner.bus.wait_until_idle()
return abort_event_holder["event"]
abort_event = asyncio.run(run())
assert abort_event is not None
@pytest.mark.django_db(transaction=True)
@pytest.mark.django_db(transaction=True)
def test_snapshot_payload_uses_crawl_chrome_dirs_by_default():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.personas.models import Persona
from archivebox.services.runner import CrawlRunner
persona = Persona(name="RuntimePersona")
persona.save()
crawl = Crawl(
urls="https://example.com",
persona_id=persona.id,
created_by_id=get_or_create_system_user_pk(),
)
crawl.save()
snapshot = Snapshot(url="https://example.com", crawl=crawl)
snapshot.save()
other_snapshot = Snapshot(url="https://example.org", crawl=crawl)
other_snapshot.save()
runner = CrawlRunner(crawl)
runner.load_run_state()
crawl_downloads_sentinel = persona.runtime_downloads_dir_for_crawl(crawl) / "keep.txt"
crawl_downloads_sentinel.write_text("keep")
payload = runner.load_snapshot_payload(str(snapshot.id))
other_payload = runner.load_snapshot_payload(str(other_snapshot.id))
config = payload["config"]
other_config = other_payload["config"]
personas_dir = Path(config["PERSONAS_DIR"])
other_personas_dir = Path(other_config["PERSONAS_DIR"])
assert personas_dir.is_relative_to(crawl.output_dir)
assert personas_dir == persona.runtime_root_for_crawl(crawl).parent
assert personas_dir == other_personas_dir
assert personas_dir / config["ACTIVE_PERSONA"] / "chrome_profile" == persona.runtime_profile_dir_for_crawl(crawl)
assert personas_dir / config["ACTIVE_PERSONA"] / "chrome_downloads" == persona.runtime_downloads_dir_for_crawl(crawl)
assert crawl_downloads_sentinel.read_text() == "keep"
assert config["ACTIVE_PERSONA"] == "RuntimePersona"
assert Path(config["CRAWL_DIR"]) == crawl.output_dir
assert Path(config["SNAP_DIR"]) == snapshot.output_dir
@pytest.mark.django_db(transaction=True)
def test_snapshot_payload_uses_snapshot_chrome_dirs_when_snapshot_isolated():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.personas.models import Persona
from archivebox.services.runner import CrawlRunner
persona = Persona(name="SnapshotRuntimePersona")
persona.save()
crawl = Crawl(
urls="https://example.com\nhttps://example.org",
persona_id=persona.id,
created_by_id=get_or_create_system_user_pk(),
config={"CHROME_ISOLATION": "snapshot"},
)
crawl.save()
snapshot = Snapshot(url="https://example.com", crawl=crawl)
snapshot.save()
other_snapshot = Snapshot(url="https://example.org", crawl=crawl)
other_snapshot.save()
runner = CrawlRunner(crawl)
runner.load_run_state()
payload = runner.load_snapshot_payload(str(snapshot.id))
other_payload = runner.load_snapshot_payload(str(other_snapshot.id))
config = payload["config"]
other_config = other_payload["config"]
personas_dir = Path(config["PERSONAS_DIR"])
other_personas_dir = Path(other_config["PERSONAS_DIR"])
assert personas_dir.is_relative_to(snapshot.output_dir)
assert other_personas_dir.is_relative_to(other_snapshot.output_dir)
assert personas_dir == persona.runtime_root_for_snapshot(snapshot).parent
assert other_personas_dir == persona.runtime_root_for_snapshot(other_snapshot).parent
assert personas_dir != other_personas_dir
assert personas_dir / config["ACTIVE_PERSONA"] / "chrome_profile" == persona.runtime_profile_dir_for_snapshot(snapshot)
assert personas_dir / config["ACTIVE_PERSONA"] / "chrome_downloads" == persona.runtime_downloads_dir_for_snapshot(snapshot)
assert Path(config["CRAWL_DIR"]) == crawl.output_dir
assert Path(config["SNAP_DIR"]) == snapshot.output_dir
def test_ensure_background_runner_skips_under_pytest_guard():
from archivebox.services.runner import ensure_background_runner
assert ensure_background_runner() is False
@pytest.mark.django_db(transaction=True)
def test_ensure_background_runner_skips_with_real_running_orchestrator_record():
import os
from datetime import datetime
import psutil
from archivebox.machine.models import Machine, Process
from archivebox.services.runner import ensure_background_runner
from django.utils import timezone
os_proc = psutil.Process(os.getpid())
process = Process.objects.create(
machine=Machine.current(),
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(),
started_at=datetime.fromtimestamp(os_proc.create_time(), tz=timezone.get_current_timezone()),
)
assert ensure_background_runner(allow_under_pytest=True) is False
process.refresh_from_db()
assert process.status == Process.StatusChoices.RUNNING
@pytest.mark.django_db(transaction=True)
def test_ensure_background_runner_does_not_spawn_runner_without_supervisord():
from archivebox.services.runner import ensure_background_runner
from archivebox.workers.supervisord_util import get_existing_supervisord_process, stop_existing_supervisord_process
stop_existing_supervisord_process()
assert get_existing_supervisord_process(quiet=True) is None
assert ensure_background_runner(allow_under_pytest=True) is False
assert get_existing_supervisord_process(quiet=True) is None
def test_runner_task_context_clears_inherited_abxbus_handler_context(tmp_path):
from abx_dl.events import CrawlEvent, MachineEvent
from abx_dl.orchestrator import create_bus
from abxbus.event_bus import in_handler_context
from archivebox.services import runner as runner_module
bus = create_bus(name="test_runner_task_context_clears_inherited_abxbus_handler_context")
observations = []
async def emit_from_runner_task():
observations.append(("in_handler_context", in_handler_context()))
machine_event = bus.emit(MachineEvent(config={"TIMEOUT": "30"}, config_type="user"))
await machine_event.now()
observations.append(("machine_event_path", bool(machine_event.event_path)))
async def on_crawl(event):
assert in_handler_context() is True
task = asyncio.create_task(emit_from_runner_task(), context=runner_module._runner_task_context())
await task
bus.on(CrawlEvent, on_crawl)
async def run_test():
try:
await bus.emit(
CrawlEvent(
url="https://example.com",
snapshot_id="snapshot-1",
output_dir=str(tmp_path),
),
).now()
await bus.wait_until_idle()
finally:
await bus.destroy()
asyncio.run(run_test())
assert observations == [
("in_handler_context", False),
("machine_event_path", True),
]
@pytest.mark.django_db(transaction=True)
def test_snapshot_started_state_keeps_retry_at_lease():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from django.utils import timezone
before = timezone.now()
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.STARTED,
retry_at=before,
)
snapshot = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.QUEUED,
retry_at=before,
)
assert snapshot.tick_claimed(lock_seconds=60) is True
snapshot.refresh_from_db()
assert snapshot.status == Snapshot.StatusChoices.STARTED
assert snapshot.retry_at is not None
assert snapshot.retry_at > before
@pytest.mark.django_db(transaction=True)
def test_crawl_start_event_keeps_retry_at_lease():
from abx_dl.events import CrawlStartEvent
from abx_dl.orchestrator import create_bus
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.services.crawl_service import CrawlService
from django.utils import timezone
before = timezone.now()
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.QUEUED,
retry_at=before,
)
snapshot = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.QUEUED,
retry_at=before,
)
bus = create_bus(name="test_crawl_start_event_keeps_retry_at_lease")
CrawlService(bus, crawl_id=str(crawl.id))
async def run_event():
try:
await bus.emit(
CrawlStartEvent(
url=snapshot.url,
snapshot_id=str(snapshot.id),
output_dir=str(crawl.output_dir),
),
).now()
await bus.wait_until_idle()
finally:
await bus.destroy(clear=False)
asyncio.run(run_event())
crawl.refresh_from_db()
assert crawl.status == Crawl.StatusChoices.STARTED
assert crawl.retry_at is not None
assert crawl.retry_at > before
@pytest.mark.django_db(transaction=True)
def test_crawl_start_event_does_not_reschedule_sealed_parent_until_explicit_requeue():
from abx_dl.events import CrawlStartEvent
from abx_dl.orchestrator import create_bus
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.services.crawl_service import CrawlService
from django.utils import timezone
before = timezone.now()
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.SEALED,
retry_at=before,
)
snapshot = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.SEALED,
retry_at=before,
)
crawl_output_dir = str(crawl.output_dir)
async def emit_start(name: str) -> None:
bus = create_bus(name=name)
try:
CrawlService(bus, crawl_id=str(crawl.id))
emitted = bus.emit(
CrawlStartEvent(
url=snapshot.url,
snapshot_id=str(snapshot.id),
output_dir=crawl_output_dir,
),
)
await emitted.now()
await emitted.event_results_list()
await bus.wait_until_idle()
finally:
await bus.destroy(clear=False)
asyncio.run(emit_start("test_crawl_start_event_sealed_parent_noop"))
crawl.refresh_from_db()
snapshot.refresh_from_db()
# CrawlStartEvent on a sealed parent is a no-op — neither the parent nor
# its sealed child gets resurrected by the handler.
assert crawl.status == Crawl.StatusChoices.SEALED
assert crawl.retry_at == before
assert snapshot.status == Snapshot.StatusChoices.SEALED
assert snapshot.retry_at == before
# The orchestrator's seal-cleanup pass picks the sealed row up via
# retry_at, runs cleanup, and clears retry_at — that's how the
# ``retry_at != None`` invariant the handler intentionally preserves
# eventually drains to ``None``.
from archivebox.services.runner import run_due_crawl
assert run_due_crawl(crawl, lock_seconds=10) is True
crawl.refresh_from_db()
assert crawl.status == Crawl.StatusChoices.SEALED
assert crawl.retry_at is None
crawl.update_and_requeue(status=Crawl.StatusChoices.QUEUED, retry_at=timezone.now())
crawl.refresh_from_db()
assert crawl.status == Crawl.StatusChoices.QUEUED
asyncio.run(emit_start("test_crawl_start_event_after_explicit_requeue"))
crawl.refresh_from_db()
assert crawl.status == Crawl.StatusChoices.STARTED
assert crawl.retry_at is not None
assert crawl.retry_at > before
@pytest.mark.django_db(transaction=True)
def test_snapshot_queue_selection_is_retry_at_only_for_sealed_maintenance():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from django.utils import timezone
now = timezone.now()
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.SEALED,
retry_at=None,
)
snapshot = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.SEALED,
retry_at=now,
)
assert Snapshot.get_queue().filter(id=snapshot.id).exists()
@pytest.mark.django_db(transaction=True)
def test_machine_service_persists_only_derived_config_events(tmp_path, hermetic_lib_dir):
from abx_dl.events import MachineEvent
from abx_dl.orchestrator import create_bus
from archivebox.machine.models import Machine
from archivebox.services.machine_service import MachineService
machine = Machine.current()
machine.config = {}
machine.save(update_fields=["config"])
wget_binary = hermetic_lib_dir / "bin" / "wget"
wget_binary.write_text("#!/bin/sh\n")
wget_binary.chmod(0o755)
async def run_test():
bus = create_bus(name="test_machine_service_persists_only_derived_config_events")
try:
MachineService(bus)
user_event = bus.emit(
MachineEvent(
config={
"CHROME_ISOLATION": "snapshot",
"CHROME_USER_DATA_DIR": "/tmp/stale-profile",
},
config_type="user",
),
)
await user_event.now()
await user_event.event_results_list()
derived_event = bus.emit(
MachineEvent(
config={
"WGET_BINARY": str(wget_binary),
"ABX_INSTALL_CACHE": {"wget": "2026-03-24T00:00:00+00:00"},
"ABX_UV_CACHE": "/tmp/uv-cache",
"ABX_PNPM_CACHE": "/tmp/pnpm-cache",
"CHROME_USER_DATA_DIR": "/tmp/stale-derived-profile",
},
config_type="derived",
),
)
await derived_event.now()
await derived_event.event_results_list()
unset_event = bus.emit(
MachineEvent(
method="unset",
key="config/WGET_BINARY",
config_type="derived",
),
)
await unset_event.now()
await unset_event.event_results_list()
await bus.wait_until_idle()
finally:
await bus.destroy()
asyncio.run(run_test())
machine.refresh_from_db()
# User events are dropped (handler ignores non-derived). At the event
# projector ``machine_service.py`` strips anything that isn't a binary
# path / install cache — that's the security boundary that stops plugins
# from rewriting arbitrary user config via events. So CHROME_USER_DATA_DIR
# from the derived payload is dropped; WGET_BINARY made it in (inside
# ABXPKG_LIB_DIR) then the unset removed it; ABX_INSTALL_CACHE survives.
assert machine.config == {
"ABX_INSTALL_CACHE": {"wget": "2026-03-24T00:00:00+00:00"},
"ABX_PNPM_CACHE": "/tmp/pnpm-cache",
"ABX_UV_CACHE": "/tmp/uv-cache",
}
@pytest.mark.django_db(transaction=True)
def test_load_run_state_uses_real_lib_dir_for_machine_binary_config(tmp_path, hermetic_lib_dir):
import archivebox.machine.models as machine_models
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.config.common import get_config
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.machine.models import Machine
from archivebox.services.runner import CrawlRunner
resolved_lib_dir = get_config(include_machine=False).ABXPKG_LIB_DIR
assert resolved_lib_dir == hermetic_lib_dir, f"ABXPKG_LIB_DIR override not applied: {resolved_lib_dir!r} != {hermetic_lib_dir!r}"
wget_binary = resolved_lib_dir / "bin" / "wget"
wget_binary.write_text("#!/bin/sh\n", encoding="utf-8")
wget_binary.chmod(0o755)
external_binary = tmp_path / "external" / "yt-dlp"
external_binary.parent.mkdir(parents=True)
external_binary.write_text("#!/bin/sh\n", encoding="utf-8")
external_binary.chmod(0o755)
machine = Machine.current()
machine.config = {
"WGET_BINARY": str(wget_binary),
"YTDLP_BINARY": str(external_binary),
"ABX_INSTALL_CACHE": {"wget": "2026-03-24T00:00:00+00:00"},
"CHROME_ISOLATION": "snapshot",
"CHROME_USER_DATA_DIR": "/tmp/stale-profile",
}
machine.save(update_fields=["config"])
machine_models._CURRENT_MACHINE = machine
crawl = Crawl.objects.create(
urls="https://example.com",
config={
"PLUGINS": "__archivebox_test_no_plugins__",
"CHROME_BINARY": "",
},
created_by_id=get_or_create_system_user_pk(),
)
runner = CrawlRunner(crawl)
snapshot_ids = runner.load_run_state()
# ``derived_config`` is Machine.config sanitized against ABXPKG_LIB_DIR. Binary
# paths outside ABXPKG_LIB_DIR drop out (YTDLP_BINARY → ``/tmp/...``); the
# ArchiveBox.conf mirror values (CHROME_ISOLATION, CHROME_USER_DATA_DIR,
# ABX_INSTALL_CACHE) survive so plugin hooks see the same runtime cache
# the user/runner persisted.
assert runner.derived_config == {
"WGET_BINARY": str(wget_binary),
"ABX_INSTALL_CACHE": {"wget": "2026-03-24T00:00:00+00:00"},
"CHROME_ISOLATION": "snapshot",
"CHROME_USER_DATA_DIR": "/tmp/stale-profile",
}
assert runner.base_config["ABXPKG_LIB_DIR"] == resolved_lib_dir
assert runner.base_config["CHROME_KEEPALIVE"] is False
assert runner.selected_plugins == ["__archivebox_test_no_plugins__"]
assert Snapshot.objects.filter(id__in=snapshot_ids, crawl=crawl, url="https://example.com").count() == 1
@pytest.mark.django_db(transaction=True)
def test_crawl_runner_empty_plugin_selection_emits_lifecycle_and_seals_crawl(tmp_path):
from abx_dl.events import CrawlCleanupEvent, CrawlCompletedEvent, CrawlEvent, CrawlSetupEvent, CrawlStartEvent, MachineEvent
from abx_dl.events import SnapshotCompletedEvent, SnapshotEvent
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.services.runner import CrawlRunner
crawl = Crawl.objects.create(
urls="https://example.com",
config={
"ABXPKG_LIB_DIR": str(tmp_path / "lib"),
"PLUGINS": "__archivebox_test_no_plugins__",
"CHROME_BINARY": "",
},
created_by_id=get_or_create_system_user_pk(),
)
runner = CrawlRunner(crawl)
seen_events = {
CrawlEvent: [],
CrawlSetupEvent: [],
CrawlStartEvent: [],
SnapshotEvent: [],
SnapshotCompletedEvent: [],
CrawlCleanupEvent: [],
CrawlCompletedEvent: [],
MachineEvent: [],
}
for event_type, events in seen_events.items():
runner.bus.on(event_type, lambda event, events=events: events.append(event))
asyncio.run(runner.run())
crawl_events = seen_events[CrawlEvent]
setup_events = seen_events[CrawlSetupEvent]
start_events = seen_events[CrawlStartEvent]
snapshot_events = seen_events[SnapshotEvent]
snapshot_completed_events = seen_events[SnapshotCompletedEvent]
cleanup_events = seen_events[CrawlCleanupEvent]
completed_events = seen_events[CrawlCompletedEvent]
machine_events = seen_events[MachineEvent]
assert len(crawl_events) == 1
assert len(setup_events) == 1
assert len(start_events) == 1
assert len(snapshot_events) == 1
assert len(snapshot_completed_events) == 1
assert len(cleanup_events) == 1
assert len(completed_events) == 1
assert runner.bus.event_is_child_of(setup_events[0], crawl_events[0])
assert runner.bus.event_is_child_of(start_events[0], crawl_events[0])
assert runner.bus.event_is_child_of(cleanup_events[0], crawl_events[0])
assert runner.bus.event_is_child_of(completed_events[0], crawl_events[0])
assert runner.bus.event_is_child_of(snapshot_events[0], start_events[0])
assert runner.bus.event_is_child_of(snapshot_completed_events[0], snapshot_events[0])
assert any(event.config_type == "user" for event in machine_events)
crawl.refresh_from_db()
snapshot = Snapshot.objects.get(crawl=crawl)
assert crawl.status == Crawl.StatusChoices.SEALED
assert crawl.retry_at is None
assert snapshot.status == Snapshot.StatusChoices.SEALED
assert snapshot.retry_at is None
assert snapshot.archiveresult_set.count() == 0
@pytest.mark.django_db(transaction=True)
def test_crawl_runner_resolves_persona_and_crawl_config_for_each_live_snapshot():
from abx_dl.events import SnapshotCompletedEvent
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import ArchiveResult, Snapshot
from archivebox.machine.models import Process
from archivebox.personas.models import Persona
from archivebox.services.runner import CrawlRunner
persona = Persona.objects.create(
name="RuntimeConfig",
config={
"FAVICON_PROVIDER": "https://example.com/persona-first.ico",
"FAVICON_TIMEOUT": 10,
},
)
persona.ensure_dirs()
crawl = Crawl.objects.create(
urls="\n".join(
[
"https://www.python.org/",
"https://www.djangoproject.com/",
"https://www.wikipedia.org/",
],
),
config={
"PLUGINS": "favicon",
"CRAWL_MAX_CONCURRENT_SNAPSHOTS": 1,
},
persona_id=persona.id,
created_by_id=get_or_create_system_user_pk(),
)
runner = CrawlRunner(crawl)
completed_snapshot_ids: list[str] = []
async def update_config_between_snapshots(event: SnapshotCompletedEvent) -> None:
if event.snapshot_id in completed_snapshot_ids:
return
completed_snapshot_ids.append(event.snapshot_id)
if len(completed_snapshot_ids) == 1:
persona.config = {
**(persona.config or {}),
"FAVICON_PROVIDER": "https://example.com/persona-second.ico",
}
await persona.asave(update_fields=["config"])
elif len(completed_snapshot_ids) == 2:
fresh_crawl = await Crawl.objects.aget(id=crawl.id)
fresh_crawl.config = {
**(fresh_crawl.config or {}),
"FAVICON_PROVIDER": "https://example.com/crawl-third.ico",
}
await fresh_crawl.asave(update_fields=["config"])
runner.bus.on(SnapshotCompletedEvent, update_config_between_snapshots)
asyncio.run(runner.run())
favicon_processes = [
process
for process in Process.objects.filter(process_type=Process.TypeChoices.HOOK).order_by("started_at")
if process.cmd and "on_Snapshot__11_favicon.finite.bg.py" in str(process.cmd[0])
]
providers = [process.env.get("FAVICON_PROVIDER") for process in favicon_processes]
crawl.refresh_from_db()
assert crawl.status == Crawl.StatusChoices.SEALED
assert Snapshot.objects.filter(crawl=crawl, status=Snapshot.StatusChoices.SEALED).count() == 3
assert (
ArchiveResult.objects.filter(snapshot__crawl=crawl, plugin="favicon").exclude(status=ArchiveResult.StatusChoices.FAILED).count()
== 3
)
assert providers == [
"https://example.com/persona-first.ico",
"https://example.com/persona-second.ico",
"https://example.com/crawl-third.ico",
]
@pytest.mark.django_db(transaction=True)
@pytest.mark.django_db(transaction=True)
def test_run_pending_crawls_processes_queued_crawl_before_missing_binary_backlog(tmp_path):
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.machine.models import Binary, Machine
from archivebox.services.runner import run_pending_crawls
from django.utils import timezone
crawl = Crawl.objects.create(
urls="https://example.com",
config={
"ABXPKG_LIB_DIR": str(tmp_path / "lib"),
"PLUGINS": "__archivebox_test_no_plugins__",
"CHROME_BINARY": "",
},
status=Crawl.StatusChoices.QUEUED,
retry_at=timezone.now(),
created_by_id=get_or_create_system_user_pk(),
)
binary = Binary.objects.create(
machine=Machine.current(),
name=str(tmp_path / "missing-node"),
status=Binary.StatusChoices.QUEUED,
retry_at=timezone.now(),
binproviders="env,apt",
overrides={"apt": {"install_args": ["nodejs"]}},
)
result = run_pending_crawls(daemon=False)
crawl.refresh_from_db()
binary.refresh_from_db()
assert result == 0
assert crawl.status == Crawl.StatusChoices.SEALED
assert crawl.retry_at is None
assert Snapshot.objects.filter(crawl=crawl, status=Snapshot.StatusChoices.SEALED).count() == 1
assert binary.status == Binary.StatusChoices.QUEUED
assert binary.retry_at is None
@pytest.mark.django_db(transaction=True)
def test_sealed_crawl_does_not_create_discovered_snapshots():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.SEALED,
retry_at=None,
max_depth=3,
)
root = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.SEALED,
retry_at=None,
)
assert crawl.create_snapshots_from_urls() == []
assert crawl.create_discovered_snapshot(root, url="https://example.com/child", depth=1) is None
assert crawl.snapshot_set.count() == 1
# test_create_crawl_api_queues_crawl_without_spawning_runner moved to test_api_v1_crawls_crawls.py.
def test_wait_for_snapshot_tasks_surfaces_already_failed_task():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.services import runner as runner_module
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
)
crawl_runner = runner_module.CrawlRunner(crawl)
async def run_test():
task = asyncio.get_running_loop().create_future()
task.set_exception(RuntimeError("snapshot failed"))
crawl_runner.snapshot_tasks["snap-1"] = task
with pytest.raises(RuntimeError, match="snapshot failed"):
await crawl_runner.wait_for_snapshot_tasks()
asyncio.run(run_test())
def test_wait_for_snapshot_tasks_returns_after_completed_tasks_are_pruned():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.services import runner as runner_module
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
)
crawl_runner = runner_module.CrawlRunner(crawl)
async def finish_snapshot() -> None:
await asyncio.sleep(0)
async def run_test():
task = asyncio.create_task(finish_snapshot())
crawl_runner.snapshot_tasks["snap-1"] = task
await asyncio.wait_for(crawl_runner.wait_for_snapshot_tasks(), timeout=0.5)
assert crawl_runner.snapshot_tasks == {}
asyncio.run(run_test())
def test_abx_process_service_background_process_finishes_after_process_exit(tmp_path):
from abx_dl.events import ProcessCompletedEvent, ProcessEvent
from abx_dl.orchestrator import create_bus
from abx_dl.services.process_service import ProcessService
bus = create_bus(name="test_abx_process_service_background_process_finishes_after_process_exit")
ProcessService(bus, emit_jsonl=False, interactive_tty=False)
emitted_events = []
async def collect_completed(event):
emitted_events.append(event)
bus.on(ProcessCompletedEvent, collect_completed)
plugin_output_dir = tmp_path / "chrome"
plugin_output_dir.mkdir()
async def run_test():
try:
event = ProcessEvent(
plugin_name="chrome",
hook_name="on_CrawlSetup__90_chrome_launch.daemon.bg",
hook_path=sys.executable,
hook_args=["-c", "print('daemon output')"],
env={},
output_dir=str(plugin_output_dir),
timeout=60,
is_background=True,
url="https://example.org/",
process_type="hook",
worker_type="hook",
)
await asyncio.wait_for(bus.emit(event).now(), timeout=0.5)
completed = await bus.find(ProcessCompletedEvent, past=True, future=5.0)
assert isinstance(completed, ProcessCompletedEvent)
await completed.event_results_list()
assert completed.status == "succeeded"
assert completed.stdout.strip() == "daemon output"
assert completed.output_dir == str(plugin_output_dir)
assert bus.event_is_child_of(completed, event)
finally:
await bus.destroy()
asyncio.run(run_test())
assert not list(plugin_output_dir.glob("on_CrawlSetup__90_chrome_launch.daemon.bg.*.pid"))
assert any(isinstance(event, ProcessCompletedEvent) for event in emitted_events)
def test_run_pending_crawls_disables_missing_absolute_binary_backlog(tmp_path):
from archivebox.machine.models import Binary, Machine
from archivebox.services import runner as runner_module
missing_binary = tmp_path / "missing-node"
binary = Binary.objects.create(
machine=Machine.current(),
name=str(missing_binary),
status=Binary.StatusChoices.QUEUED,
retry_at=runner_module.timezone.now(),
binproviders="env,apt",
overrides={"apt": {"install_args": ["nodejs"]}},
)
result = runner_module.run_pending_crawls(daemon=False)
binary.refresh_from_db()
assert result == 0
assert binary.status == Binary.StatusChoices.QUEUED
assert binary.retry_at is None
@pytest.mark.django_db(transaction=True)
def test_crawl_completed_event_requeues_active_snapshots():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.services.crawl_service import CrawlService
from abx_dl.events import CrawlCompletedEvent
from abx_dl.orchestrator import create_bus
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.STARTED,
retry_at=None,
)
Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.STARTED,
retry_at=None,
)
bus = create_bus(name=f"test_crawl_completed_active_snapshots_{str(crawl.id).replace('-', '_')}")
service = CrawlService(bus, crawl_id=str(crawl.id))
assert service is not None
async def emit_completed() -> None:
try:
event = CrawlCompletedEvent(
url="https://example.com",
snapshot_id="",
output_dir=str(crawl.output_dir),
)
emitted = bus.emit(event)
await emitted.wait()
await emitted.event_results_list()
await bus.wait_until_idle()
finally:
await bus.destroy()
asyncio.run(emit_completed())
crawl.refresh_from_db()
assert crawl.status == Crawl.StatusChoices.STARTED
assert crawl.retry_at is not None
@pytest.mark.django_db(transaction=True)
def test_crawl_start_event_does_not_resurrect_cancelled_crawl():
from django.utils import timezone
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.services.crawl_service import CrawlService
from abx_dl.events import CrawlStartEvent
from abx_dl.orchestrator import create_bus
now = timezone.now()
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.SEALED,
retry_at=now,
)
bus = create_bus(name=f"test_crawl_start_cancelled_{str(crawl.id).replace('-', '_')}")
service = CrawlService(bus, crawl_id=str(crawl.id))
assert service is not None
async def emit_start() -> None:
try:
event = CrawlStartEvent(
url="https://example.com",
snapshot_id="",
output_dir=str(crawl.output_dir),
)
emitted = bus.emit(event)
await emitted.wait()
await emitted.event_results_list()
await bus.wait_until_idle()
finally:
await bus.destroy()
asyncio.run(emit_start())
crawl.refresh_from_db()
assert crawl.status == Crawl.StatusChoices.SEALED
assert crawl.retry_at == now
@pytest.mark.django_db(transaction=True)
def test_crawl_cleanup_event_requeues_unfinished_crawl():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.services.crawl_service import CrawlService
from abx_dl.events import CrawlCleanupEvent
from abx_dl.orchestrator import create_bus
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.STARTED,
retry_at=None,
)
snapshot = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.QUEUED,
retry_at=None,
)
bus = create_bus(name=f"test_crawl_cleanup_requeues_unfinished_{str(crawl.id).replace('-', '_')}")
service = CrawlService(bus, crawl_id=str(crawl.id))
assert service is not None
async def emit_cleanup() -> None:
try:
event = CrawlCleanupEvent(
url="https://example.com",
snapshot_id=str(snapshot.id),
output_dir=str(crawl.output_dir),
)
emitted = bus.emit(event)
await emitted.wait()
await emitted.event_results_list()
await bus.wait_until_idle()
finally:
await bus.destroy()
asyncio.run(emit_cleanup())
crawl.refresh_from_db()
assert crawl.status == Crawl.StatusChoices.STARTED
assert crawl.retry_at is not None
@pytest.mark.django_db(transaction=True)
def test_crawl_completed_event_seals_finished_crawl():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.services.crawl_service import CrawlService
from abx_dl.events import CrawlCompletedEvent
from abx_dl.orchestrator import create_bus
from django.utils import timezone
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.STARTED,
retry_at=timezone.now(),
)
snapshot = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.SEALED,
retry_at=None,
)
bus = create_bus(name=f"test_crawl_completed_finished_crawl_{str(crawl.id).replace('-', '_')}")
service = CrawlService(bus, crawl_id=str(crawl.id))
assert service is not None
async def emit_cleanup() -> None:
try:
event = CrawlCompletedEvent(
url="https://example.com",
snapshot_id=str(snapshot.id),
output_dir=str(crawl.output_dir),
)
emitted = bus.emit(event)
await emitted.wait()
await emitted.event_results_list()
await bus.wait_until_idle()
finally:
await bus.destroy()
asyncio.run(emit_cleanup())
crawl.refresh_from_db()
assert crawl.status == Crawl.StatusChoices.SEALED
assert crawl.retry_at is None
@pytest.mark.django_db(transaction=True)
def test_snapshot_completed_event_defers_finished_crawl_seal():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.services.snapshot_service import SnapshotService
from abx_dl.events import SnapshotCompletedEvent
from abx_dl.orchestrator import create_bus
from django.utils import timezone
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.STARTED,
retry_at=timezone.now(),
)
snapshot = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.STARTED,
retry_at=None,
)
bus = create_bus(name=f"test_snapshot_completed_finished_crawl_{str(crawl.id).replace('-', '_')}")
service = SnapshotService(bus, crawl_id=str(crawl.id), schedule_snapshot=lambda snapshot_id: asyncio.sleep(0))
try:
async def emit_completed() -> None:
await service.on_SnapshotCompletedEvent(
SnapshotCompletedEvent(
url="https://example.com",
snapshot_id=str(snapshot.id),
output_dir=str(snapshot.output_dir),
),
)
asyncio.run(emit_completed())
finally:
asyncio.run(bus.destroy())
snapshot.refresh_from_db()
crawl.refresh_from_db()
assert snapshot.status == Snapshot.StatusChoices.SEALED
assert crawl.status == Crawl.StatusChoices.STARTED
assert crawl.retry_at is not None
@pytest.mark.django_db(transaction=True)
def test_snapshot_completed_event_bus_defers_finished_crawl_seal():
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.services.snapshot_service import SnapshotService
from abx_dl.events import SnapshotCompletedEvent
from abx_dl.orchestrator import create_bus
from django.utils import timezone
crawl = Crawl.objects.create(
urls="https://example.com",
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.STARTED,
retry_at=timezone.now(),
)
snapshot = Snapshot.objects.create(
url="https://example.com",
crawl=crawl,
status=Snapshot.StatusChoices.STARTED,
retry_at=None,
)
bus = create_bus(name=f"test_snapshot_completed_bus_finished_crawl_{str(crawl.id).replace('-', '_')}")
service = SnapshotService(bus, crawl_id=str(crawl.id), schedule_snapshot=lambda snapshot_id: asyncio.sleep(0))
assert service is not None
try:
async def emit_completed() -> None:
emitted = bus.emit(
SnapshotCompletedEvent(
url="https://example.com",
snapshot_id=str(snapshot.id),
output_dir=str(snapshot.output_dir),
),
)
await emitted.wait()
await emitted.event_results_list()
asyncio.run(emit_completed())
finally:
asyncio.run(bus.wait_until_idle())
asyncio.run(bus.destroy())
snapshot.refresh_from_db()
crawl.refresh_from_db()
assert snapshot.status == Snapshot.StatusChoices.SEALED
assert crawl.status == Crawl.StatusChoices.STARTED
assert crawl.retry_at is not None