mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-06-22 11:31:05 -04:00
1147 lines
45 KiB
Python
1147 lines
45 KiB
Python
#!/usr/bin/env python3
|
|
"""Takeover utility tests and live command handoff flows."""
|
|
|
|
import os
|
|
import signal
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from archivebox.core.models import ArchiveResult, Snapshot
|
|
from archivebox.crawls.models import Crawl
|
|
from archivebox.machine.models import Process
|
|
from archivebox.tests.conftest import (
|
|
assert_no_processes_for_data_dir,
|
|
get_free_port,
|
|
kill_processes_for_data_dir,
|
|
cli_env,
|
|
pid_is_alive,
|
|
run_archivebox_cmd,
|
|
start_archivebox_server,
|
|
stop_archivebox_process,
|
|
supervisor_pid_from_log,
|
|
wait_for_http,
|
|
wait_for_log,
|
|
wait_for_log_count,
|
|
wait_for_log_pattern,
|
|
wait_for_pid_to_disappear,
|
|
wait_for_snapshot_capture,
|
|
wait_for_worker_pid_from_log,
|
|
worker_pid_from_log,
|
|
)
|
|
from archivebox.tests.test_orm_helpers import use_archivebox_db
|
|
|
|
pytestmark = pytest.mark.django_db(transaction=True)
|
|
|
|
|
|
def _require_sonic_binary() -> None:
|
|
assert shutil.which("sonic") is not None, "sonic server binary is required for Sonic worker takeover tests"
|
|
|
|
|
|
def _archive_pages_for_sqlite_reindexing(data_dir: Path, env: dict[str, str], root_url: str) -> None:
|
|
add_env = dict(env)
|
|
add_env["SEARCH_BACKEND_ENGINE"] = "ripgrep"
|
|
_cmd_result = run_archivebox_cmd(
|
|
[
|
|
"add",
|
|
"--depth=2",
|
|
"--max-urls=20",
|
|
"--crawl-max-size=50mb",
|
|
"--plugins=wget,parse_html_urls",
|
|
root_url,
|
|
],
|
|
cwd=data_dir,
|
|
env=add_env,
|
|
timeout=240,
|
|
)
|
|
stdout, stderr, returncode = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
|
|
assert returncode == 0, stderr or stdout
|
|
|
|
with use_archivebox_db(data_dir):
|
|
assert Snapshot.objects.filter(status=Snapshot.StatusChoices.SEALED).count() >= 1
|
|
assert not ArchiveResult.objects.filter(plugin="search_backend_sqlite").exists()
|
|
|
|
|
|
@pytest.mark.timeout(360)
|
|
def test_behavior_update_index_only_keeps_server_http_and_search_visible(tmp_path, initialized_archive, recursive_test_site):
|
|
_require_sonic_binary()
|
|
|
|
env = cli_env(
|
|
live=True,
|
|
PLUGINS="wget,parse_html_urls,search_backend_sqlite",
|
|
SEARCH_BACKEND_ENGINE="sqlite",
|
|
SEARCH_BACKEND_SONIC_PORT=str(get_free_port()),
|
|
)
|
|
root_url = recursive_test_site["root_url"]
|
|
_archive_pages_for_sqlite_reindexing(tmp_path, env, root_url)
|
|
|
|
port = get_free_port()
|
|
server = None
|
|
try:
|
|
server = start_archivebox_server(tmp_path, port=port, log_name="behavior-server-update.log", env=env)
|
|
assert wait_for_http(port, host=f"archivebox.localhost:{port}").status_code < 500
|
|
|
|
update = run_archivebox_cmd(
|
|
["update", "--index-only", "--batch-size=1"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=180,
|
|
)
|
|
|
|
assert update.returncode == 0, update.stderr or update.stdout
|
|
assert wait_for_http(port, host=f"archivebox.localhost:{port}").status_code < 500
|
|
|
|
search = run_archivebox_cmd(
|
|
["list", "--search=contents", "--csv=url", "Root"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=60,
|
|
)
|
|
|
|
assert search.returncode == 0, search.stderr or search.stdout
|
|
assert root_url in search.stdout
|
|
finally:
|
|
if server is not None and server.poll() is None:
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(420)
|
|
def test_behavior_update_yields_to_server_then_finishes_visible_indexing(tmp_path, initialized_archive, recursive_test_site):
|
|
_require_sonic_binary()
|
|
|
|
env = cli_env(
|
|
live=True,
|
|
PLUGINS="wget,parse_html_urls,search_backend_sqlite",
|
|
SEARCH_BACKEND_ENGINE="sqlite",
|
|
SEARCH_BACKEND_SONIC_PORT=str(get_free_port()),
|
|
)
|
|
root_url = recursive_test_site["root_url"]
|
|
_archive_pages_for_sqlite_reindexing(tmp_path, env, root_url)
|
|
|
|
port = get_free_port()
|
|
update_proc = None
|
|
server = None
|
|
try:
|
|
update_log = tmp_path / "behavior-update-yields.log"
|
|
update_log_handle = update_log.open("w", encoding="utf-8")
|
|
update_proc = run_archivebox_cmd(
|
|
["update", "--index-only", "--batch-size=1"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
stdout=update_log_handle,
|
|
stderr=subprocess.STDOUT,
|
|
start_new_session=True,
|
|
wait=False,
|
|
)
|
|
update_log_handle.close()
|
|
wait_for_log(update_log, "[*] Reindexing", timeout=90)
|
|
|
|
server = start_archivebox_server(tmp_path, port=port, log_name="behavior-server-takes-update.log", env=env)
|
|
assert wait_for_http(port, host=f"archivebox.localhost:{port}").status_code < 500
|
|
wait_for_log(update_log, "A newer archivebox process took over the orchestrator, sonic", timeout=90)
|
|
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
server = None
|
|
update_proc.wait(timeout=180)
|
|
update_text = update_log.read_text(encoding="utf-8", errors="replace")
|
|
assert update_proc.returncode == 0, update_text
|
|
|
|
search = run_archivebox_cmd(
|
|
["list", "--search=contents", "--csv=url", "Root"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=60,
|
|
)
|
|
|
|
assert search.returncode == 0, search.stderr or search.stdout
|
|
assert root_url in search.stdout
|
|
finally:
|
|
if update_proc is not None and update_proc.poll() is None:
|
|
stop_archivebox_process(update_proc, signal.SIGTERM)
|
|
if server is not None and server.poll() is None:
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(300)
|
|
def test_behavior_foreground_add_keeps_existing_server_http_visible(tmp_path, initialized_archive, recursive_test_site):
|
|
|
|
port = get_free_port()
|
|
env = cli_env(live=True, server=True, port=port, SEARCH_BACKEND_ENGINE="ripgrep")
|
|
server = None
|
|
try:
|
|
server = start_archivebox_server(tmp_path, port=port, log_name="behavior-server-add.log", env=env)
|
|
assert wait_for_http(port, host=f"archivebox.localhost:{port}").status_code < 500
|
|
|
|
add = run_archivebox_cmd(
|
|
[
|
|
"add",
|
|
"--depth=0",
|
|
"--plugins=wget",
|
|
recursive_test_site["root_url"],
|
|
],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=180,
|
|
)
|
|
|
|
assert add.returncode == 0, add.stderr or add.stdout
|
|
assert wait_for_http(port, host=f"archivebox.localhost:{port}").status_code < 500
|
|
captured_text = wait_for_snapshot_capture(tmp_path, recursive_test_site["root_url"], timeout=120)
|
|
assert "Root" in captured_text
|
|
finally:
|
|
if server is not None and server.poll() is None:
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(300)
|
|
def test_behavior_background_add_returns_and_server_archives_visible_url(tmp_path, initialized_archive, recursive_test_site):
|
|
|
|
port = get_free_port()
|
|
env = cli_env(live=True, server=True, port=port, SEARCH_BACKEND_ENGINE="ripgrep")
|
|
server = None
|
|
try:
|
|
server = start_archivebox_server(tmp_path, port=port, log_name="behavior-server-bg-add.log", env=env)
|
|
assert wait_for_http(port, host=f"archivebox.localhost:{port}").status_code < 500
|
|
|
|
add = run_archivebox_cmd(
|
|
[
|
|
"add",
|
|
"--bg",
|
|
"--depth=0",
|
|
"--plugins=wget",
|
|
recursive_test_site["root_url"],
|
|
],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=60,
|
|
)
|
|
|
|
assert add.returncode == 0, add.stderr or add.stdout
|
|
assert "background runner will process" in add.stdout
|
|
assert wait_for_http(port, host=f"archivebox.localhost:{port}").status_code < 500
|
|
captured_text = wait_for_snapshot_capture(tmp_path, recursive_test_site["root_url"], timeout=180)
|
|
assert "Root" in captured_text
|
|
finally:
|
|
if server is not None and server.poll() is None:
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(300)
|
|
def test_behavior_daemonized_server_restarts_cleanly_after_forced_stop(tmp_path, initialized_archive):
|
|
|
|
port = get_free_port()
|
|
env = cli_env(live=True, server=True, port=port)
|
|
try:
|
|
first = start_archivebox_server(tmp_path, port=port, env=env, daemonize=True)
|
|
assert first.returncode == 0, first.stderr or first.stdout
|
|
assert wait_for_http(port, host=f"archivebox.localhost:{port}").status_code < 500
|
|
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
second = start_archivebox_server(tmp_path, port=port, env=env, daemonize=True)
|
|
assert second.returncode == 0, second.stderr or second.stdout
|
|
assert wait_for_http(port, host=f"archivebox.localhost:{port}").status_code < 500
|
|
finally:
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(240)
|
|
def test_live_second_server_takes_over_existing_server_process(tmp_path, initialized_archive):
|
|
|
|
env = cli_env(live=True)
|
|
port = get_free_port()
|
|
first = None
|
|
second = None
|
|
try:
|
|
first = start_archivebox_server(tmp_path, port=port, log_name="server-first.log", env=env)
|
|
first_log = first.log_path
|
|
second = start_archivebox_server(tmp_path, port=port, log_name="server-second.log", env=env)
|
|
second_log = second.log_path
|
|
|
|
assert first.poll() is None
|
|
first_text = first_log.read_text(encoding="utf-8", errors="replace")
|
|
second_text = second_log.read_text(encoding="utf-8", errors="replace")
|
|
assert "A newer archivebox process took over the orchestrator, server" in first_text
|
|
assert "Starting orchestrator, server" in second_text
|
|
|
|
_cmd_result = run_archivebox_cmd(
|
|
["status"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=60,
|
|
)
|
|
stdout, stderr, returncode = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
|
|
assert returncode == 0, stderr or stdout
|
|
|
|
first_resumes = first_log.read_text(encoding="utf-8", errors="replace").count("Other newer archivebox process")
|
|
stop_archivebox_process(second, signal.SIGTERM)
|
|
second = None
|
|
wait_for_log_count(first_log, "Other newer archivebox process", first_resumes + 1, timeout=35)
|
|
assert first.poll() is None
|
|
finally:
|
|
if second is not None and second.poll() is None:
|
|
stop_archivebox_process(second, signal.SIGTERM)
|
|
if first is not None and first.poll() is None:
|
|
stop_archivebox_process(first, signal.SIGKILL)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(180)
|
|
def test_live_update_index_only_does_not_take_over_server_runtime(tmp_path, initialized_archive):
|
|
|
|
env = cli_env(live=True)
|
|
port = get_free_port()
|
|
server = None
|
|
try:
|
|
server = start_archivebox_server(tmp_path, port=port, log_name="server-update-owner.log", env=env)
|
|
server_log = server.log_path
|
|
supervisor_pid_before = supervisor_pid_from_log(server_log)
|
|
daphne_pid_before = worker_pid_from_log(server_log, "worker_daphne")
|
|
|
|
_cmd_result = run_archivebox_cmd(
|
|
["update", "--index-only", "--before=0"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=90,
|
|
)
|
|
update_stdout, update_stderr, update_returncode = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
|
|
|
|
assert update_returncode == 0, update_stderr or update_stdout
|
|
assert server.poll() is None
|
|
assert pid_is_alive(supervisor_pid_before)
|
|
assert pid_is_alive(daphne_pid_before)
|
|
assert supervisor_pid_from_log(server_log) == supervisor_pid_before
|
|
assert worker_pid_from_log(server_log, "worker_daphne") == daphne_pid_before
|
|
assert "A newer archivebox process took over the orchestrator, server" not in server_log.read_text(
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
)
|
|
finally:
|
|
if server is not None and server.poll() is None:
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(360)
|
|
def test_live_server_keeps_http_runtime_while_update_runs_real_sqlite_indexer(tmp_path, initialized_archive, recursive_test_site):
|
|
_require_sonic_binary()
|
|
|
|
env = cli_env(
|
|
live=True,
|
|
PLUGINS="wget,parse_html_urls,search_backend_sqlite,search_backend_sonic",
|
|
SEARCH_BACKEND_ENGINE="sqlite",
|
|
SEARCH_BACKEND_SONIC_PORT=str(get_free_port()),
|
|
)
|
|
_archive_pages_for_sqlite_reindexing(tmp_path, env, recursive_test_site["root_url"])
|
|
|
|
port = get_free_port()
|
|
server = None
|
|
try:
|
|
server = start_archivebox_server(tmp_path, port=port, log_name="server-real-sqlite-update.log", env=env)
|
|
server_log = server.log_path
|
|
supervisor_pid_before = supervisor_pid_from_log(server_log)
|
|
daphne_pid_before = worker_pid_from_log(server_log, "worker_daphne")
|
|
runner_pid_before = worker_pid_from_log(server_log, "worker_runner")
|
|
sonic_pid_before = worker_pid_from_log(server_log, "worker_sonic")
|
|
|
|
_cmd_result = run_archivebox_cmd(
|
|
["update", "--index-only", "--batch-size=1"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=180,
|
|
)
|
|
update_stdout, update_stderr, update_returncode = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
|
|
|
|
assert update_returncode == 0, update_stderr or update_stdout
|
|
assert server.poll() is None
|
|
assert supervisor_pid_from_log(server_log) == supervisor_pid_before
|
|
assert pid_is_alive(daphne_pid_before)
|
|
assert pid_is_alive(sonic_pid_before)
|
|
assert worker_pid_from_log(server_log, "worker_daphne") == daphne_pid_before
|
|
assert worker_pid_from_log(server_log, "worker_sonic") == sonic_pid_before
|
|
assert "A newer archivebox process took over the orchestrator, server" not in server_log.read_text(
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
)
|
|
assert "Stopping older ArchiveBox runner process" in update_stdout
|
|
|
|
deadline = time.time() + 180
|
|
runner_pid_after = runner_pid_before
|
|
while time.time() < deadline:
|
|
with use_archivebox_db(tmp_path):
|
|
rows = list(
|
|
Process.objects.filter(
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
worker_type="worker_runner",
|
|
status=Process.StatusChoices.RUNNING,
|
|
).values("pid"),
|
|
)
|
|
for row in rows:
|
|
pid = int(row["pid"])
|
|
if pid != runner_pid_before and pid_is_alive(pid):
|
|
runner_pid_after = pid
|
|
break
|
|
if runner_pid_after != runner_pid_before:
|
|
break
|
|
time.sleep(0.25)
|
|
assert runner_pid_after != runner_pid_before
|
|
|
|
deadline = time.time() + 180
|
|
indexed_results: list[str] = []
|
|
while time.time() < deadline:
|
|
with use_archivebox_db(tmp_path):
|
|
indexed_results = list(
|
|
ArchiveResult.objects.filter(plugin="search_backend_sqlite").values_list("status", flat=True),
|
|
)
|
|
if indexed_results and all(status in ArchiveResult.FINAL_STATES for status in indexed_results):
|
|
break
|
|
time.sleep(0.25)
|
|
assert indexed_results
|
|
assert all(status in ArchiveResult.FINAL_STATES for status in indexed_results)
|
|
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
server = None
|
|
wait_for_pid_to_disappear(daphne_pid_before, timeout=20)
|
|
wait_for_pid_to_disappear(sonic_pid_before, timeout=20)
|
|
wait_for_pid_to_disappear(runner_pid_after, timeout=20)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
finally:
|
|
if server is not None and server.poll() is None:
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(420)
|
|
def test_live_update_yields_to_server_then_reclaims_real_sqlite_indexing(tmp_path, initialized_archive, recursive_test_site):
|
|
_require_sonic_binary()
|
|
|
|
env = cli_env(
|
|
live=True,
|
|
PLUGINS="wget,parse_html_urls,search_backend_sqlite,search_backend_sonic",
|
|
SEARCH_BACKEND_ENGINE="sqlite",
|
|
SEARCH_BACKEND_SONIC_PORT=str(get_free_port()),
|
|
)
|
|
_archive_pages_for_sqlite_reindexing(tmp_path, env, recursive_test_site["root_url"])
|
|
|
|
port = get_free_port()
|
|
update_proc = None
|
|
server = None
|
|
try:
|
|
update_log = tmp_path / "update-real-sqlite-owner.log"
|
|
update_log_handle = update_log.open("w", encoding="utf-8")
|
|
update_proc = run_archivebox_cmd(
|
|
["update", "--index-only", "--batch-size=1"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
stdout=update_log_handle,
|
|
stderr=subprocess.STDOUT,
|
|
start_new_session=True,
|
|
wait=False,
|
|
)
|
|
update_log_handle.close()
|
|
wait_for_log(update_log, "[*] Reindexing", timeout=90)
|
|
update_supervisor_match = wait_for_log_pattern(update_log, r"Supervisord connected \(pid=(\d+)\)", timeout=90)
|
|
update_supervisor_pid_before = int(update_supervisor_match.group(1))
|
|
update_sonic_pid_before = wait_for_worker_pid_from_log(update_log, "worker_sonic", timeout=90)
|
|
update_runner_pid_before = wait_for_worker_pid_from_log(update_log, f"worker_runner_update_{update_proc.pid}", timeout=90)
|
|
assert pid_is_alive(update_supervisor_pid_before)
|
|
assert pid_is_alive(update_sonic_pid_before)
|
|
assert pid_is_alive(update_runner_pid_before)
|
|
assert "worker_daphne" not in update_log.read_text(encoding="utf-8", errors="replace")
|
|
|
|
server = start_archivebox_server(tmp_path, port=port, log_name="server-takes-real-sqlite-update.log", env=env)
|
|
server_log = server.log_path
|
|
wait_for_log(update_log, "A newer archivebox process took over the orchestrator, sonic", timeout=90)
|
|
assert update_proc.poll() is None
|
|
assert server.poll() is None
|
|
server_text = server_log.read_text(encoding="utf-8", errors="replace")
|
|
# The older update process can yield orchestrator ownership just before
|
|
# the server logs its takeover, but sonic must always move to the server.
|
|
assert (
|
|
"Taking over orchestrator, sonic from older existing archivebox process" in server_text
|
|
or "Taking over sonic from older existing archivebox process" in server_text
|
|
)
|
|
assert "worker_daphne" in server_text
|
|
assert "worker_sonic" in server_text
|
|
server_daphne_pid = worker_pid_from_log(server_log, "worker_daphne")
|
|
server_runner_pid = worker_pid_from_log(server_log, "worker_runner")
|
|
server_sonic_pid = worker_pid_from_log(server_log, "worker_sonic")
|
|
assert pid_is_alive(server_daphne_pid)
|
|
assert pid_is_alive(server_runner_pid)
|
|
assert pid_is_alive(server_sonic_pid)
|
|
wait_for_pid_to_disappear(update_supervisor_pid_before, timeout=30)
|
|
wait_for_pid_to_disappear(update_sonic_pid_before, timeout=30)
|
|
wait_for_pid_to_disappear(update_runner_pid_before, timeout=30)
|
|
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
server = None
|
|
update_proc.wait(timeout=180)
|
|
update_text = update_log.read_text(encoding="utf-8", errors="replace")
|
|
assert update_proc.returncode == 0, update_text
|
|
wait_for_pid_to_disappear(server_daphne_pid, timeout=20)
|
|
wait_for_pid_to_disappear(server_runner_pid, timeout=20)
|
|
wait_for_pid_to_disappear(server_sonic_pid, timeout=20)
|
|
|
|
deadline = time.time() + 30
|
|
indexed_results: list[str] = []
|
|
while time.time() < deadline:
|
|
with use_archivebox_db(tmp_path):
|
|
indexed_results = list(
|
|
ArchiveResult.objects.filter(plugin="search_backend_sqlite").values_list("status", flat=True),
|
|
)
|
|
if indexed_results and all(status in ArchiveResult.FINAL_STATES for status in indexed_results):
|
|
break
|
|
time.sleep(0.25)
|
|
assert indexed_results
|
|
assert all(status in ArchiveResult.FINAL_STATES for status in indexed_results)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
finally:
|
|
if update_proc is not None and update_proc.poll() is None:
|
|
stop_archivebox_process(update_proc, signal.SIGTERM)
|
|
if server is not None and server.poll() is None:
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(420)
|
|
def test_live_repeated_server_startups_take_over_cleanly(tmp_path, initialized_archive):
|
|
|
|
env = cli_env(live=True)
|
|
port = get_free_port()
|
|
servers: list[subprocess.Popen[str]] = []
|
|
server_pids: list[int] = []
|
|
daphne_pids: list[int] = []
|
|
runner_pids: list[int] = []
|
|
try:
|
|
for index in range(5):
|
|
server = start_archivebox_server(tmp_path, port=port, log_name=f"server-chaos-{index}.log", env=env)
|
|
log_path = server.log_path
|
|
servers.append(server)
|
|
server_pids.append(server.pid)
|
|
daphne_pids.append(worker_pid_from_log(log_path, "worker_daphne"))
|
|
runner_pids.append(worker_pid_from_log(log_path, "worker_runner"))
|
|
|
|
if index > 0:
|
|
previous_server = servers[index - 1]
|
|
previous_log = (tmp_path / f"server-chaos-{index - 1}.log").read_text(encoding="utf-8", errors="replace")
|
|
current_log = log_path.read_text(encoding="utf-8", errors="replace")
|
|
assert previous_server.poll() is None
|
|
assert pid_is_alive(server_pids[index - 1])
|
|
assert "A newer archivebox process took over the orchestrator, server" in previous_log
|
|
assert "Starting orchestrator, server" in current_log
|
|
wait_for_pid_to_disappear(daphne_pids[index - 1], timeout=15)
|
|
wait_for_pid_to_disappear(runner_pids[index - 1], timeout=15)
|
|
|
|
_cmd_result = run_archivebox_cmd(
|
|
["status"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=60,
|
|
)
|
|
stdout, stderr, returncode = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
|
|
assert returncode == 0, stderr or stdout
|
|
time.sleep(5)
|
|
|
|
assert servers[-1].poll() is None
|
|
assert all(server.poll() is None for server in servers)
|
|
listener = subprocess.run(
|
|
["lsof", "-nP", f"-iTCP:{port}", "-sTCP:LISTEN"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
assert listener.returncode == 0, listener.stderr or listener.stdout
|
|
assert listener.stdout.count(f":{port} (LISTEN)") == 1
|
|
|
|
previous_log_path = tmp_path / "server-chaos-3.log"
|
|
previous_takeovers = previous_log_path.read_text(encoding="utf-8", errors="replace").count(
|
|
"Other newer archivebox process",
|
|
)
|
|
stop_archivebox_process(servers[-1], signal.SIGTERM)
|
|
wait_for_log_count(previous_log_path, "Other newer archivebox process", previous_takeovers + 1, timeout=35)
|
|
assert servers[3].poll() is None
|
|
finally:
|
|
for server in reversed(servers):
|
|
if server.poll() is None:
|
|
stop_archivebox_process(server, signal.SIGTERM)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
@pytest.mark.timeout(420)
|
|
def test_live_add_update_jobs_survive_server_and_cli_owner_exits(tmp_path, initialized_archive, recursive_test_site):
|
|
plugins_root = tmp_path / "runtime_plugins"
|
|
marker_dir = tmp_path / "slow-plugin-markers"
|
|
plugin_dir = plugins_root / "slow_exit"
|
|
plugin_dir.mkdir(parents=True, exist_ok=True)
|
|
counter_hook = plugin_dir / "on_Snapshot__08_counter.sh"
|
|
counter_hook.write_text(
|
|
"\n".join(
|
|
[
|
|
"#!/usr/bin/env bash",
|
|
"set -euo pipefail",
|
|
f"marker_dir={str(marker_dir)!r}",
|
|
'mkdir -p "$marker_dir/counter-seen"',
|
|
'snapshot_key="${SNAPSHOT_ID:-$(basename "${SNAP_DIR:-unknown}")}"',
|
|
'seen_file="$marker_dir/counter-seen/$snapshot_key"',
|
|
'if [[ -e "$seen_file" ]]; then',
|
|
' echo "$snapshot_key" >> "$marker_dir/counter-duplicates.txt"',
|
|
" exit 42",
|
|
"fi",
|
|
'touch "$seen_file"',
|
|
'echo "$snapshot_key" >> "$marker_dir/counter-runs.txt"',
|
|
"",
|
|
],
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
counter_hook.chmod(0o755)
|
|
hook = plugin_dir / "on_Snapshot__09_slow_exit.sh"
|
|
hook.write_text(
|
|
"\n".join(
|
|
[
|
|
"#!/usr/bin/env bash",
|
|
"set -euo pipefail",
|
|
f"marker_dir={str(marker_dir)!r}",
|
|
'mkdir -p "$marker_dir"',
|
|
'echo $$ >> "$marker_dir/hook-pids.txt"',
|
|
'touch "$marker_dir/hook-started"',
|
|
"trap 'touch \"$marker_dir/hook-stopped\"; exit 143' TERM INT HUP",
|
|
'while [[ ! -f "$marker_dir/allow-finish" ]]; do sleep 0.1; done',
|
|
'touch "$marker_dir/hook-finished"',
|
|
"",
|
|
],
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
hook.chmod(0o755)
|
|
|
|
env = cli_env(live=True, plugins_root=plugins_root)
|
|
port = get_free_port()
|
|
server = None
|
|
server2 = None
|
|
server3 = None
|
|
add_proc = None
|
|
add_proc2 = None
|
|
try:
|
|
server = start_archivebox_server(tmp_path, port=port, log_name="server-add-owner-1.log", env=env)
|
|
server_log = server.log_path
|
|
supervisor_pid_before = supervisor_pid_from_log(server_log)
|
|
|
|
_cmd_result = run_archivebox_cmd(
|
|
["update", "--index-only", "--batch-size=10"],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
timeout=90,
|
|
)
|
|
update_stdout, update_stderr, update_returncode = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
|
|
assert update_returncode == 0, update_stderr or update_stdout
|
|
assert server.poll() is None
|
|
assert pid_is_alive(supervisor_pid_before)
|
|
assert supervisor_pid_from_log(server_log) == supervisor_pid_before
|
|
|
|
add_log = tmp_path / "archivebox-add-1.log"
|
|
add_log_handle = add_log.open("w", encoding="utf-8")
|
|
add_proc = run_archivebox_cmd(
|
|
[
|
|
"add",
|
|
"--depth=1",
|
|
"--max-urls=2",
|
|
"--crawl-max-size=50mb",
|
|
"--plugins=wget,parse_html_urls,slow_exit",
|
|
recursive_test_site["root_url"],
|
|
recursive_test_site["child_urls"][0],
|
|
],
|
|
cwd=tmp_path,
|
|
env=env,
|
|
stdout=add_log_handle,
|
|
stderr=subprocess.STDOUT,
|
|
start_new_session=True,
|
|
wait=False,
|
|
)
|
|
add_log_handle.close()
|
|
|
|
pid_file = marker_dir / "hook-pids.txt"
|
|
deadline = time.time() + 45
|
|
hook_pids: list[int] = []
|
|
while time.time() < deadline:
|
|
if pid_file.exists():
|
|
hook_pids = [int(line.strip()) for line in pid_file.read_text().splitlines() if line.strip()]
|
|
if len(hook_pids) >= 1:
|
|
break
|
|
time.sleep(0.25)
|
|
assert len(hook_pids) >= 1
|
|
|
|
deadline = time.time() + 30
|
|
snapshot_started = False
|
|
while time.time() < deadline:
|
|
with use_archivebox_db(tmp_path):
|
|
snapshot_started = Snapshot.objects.filter(status=Snapshot.StatusChoices.STARTED).exists()
|
|
if snapshot_started:
|
|
break
|
|
time.sleep(0.25)
|
|
assert snapshot_started
|
|
|
|
os.kill(server.pid, signal.SIGTERM)
|
|
server.wait(timeout=20)
|
|
assert add_proc.poll() is None, "foreground add should keep owning its crawl after the server exits"
|
|
assert "Got SIGTERM" in server_log.read_text(encoding="utf-8", errors="replace")
|
|
|
|
stop_archivebox_process(add_proc, signal.SIGKILL, timeout=30)
|
|
add_output = add_log.read_text(encoding="utf-8", errors="replace")
|
|
assert "Runner error" not in add_output
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
server2 = start_archivebox_server(tmp_path, port=port, log_name="server-add-owner-2.log", env=env)
|
|
_server2_log = server2.log_path
|
|
(marker_dir / "allow-finish").touch()
|
|
|
|
deadline = time.time() + 90
|
|
crawls = []
|
|
snapshots = []
|
|
bad_results = []
|
|
while time.time() < deadline:
|
|
with use_archivebox_db(tmp_path):
|
|
crawls = list(Crawl.objects.order_by("created_at").values_list("status", "retry_at"))
|
|
snapshots = list(Snapshot.objects.order_by("created_at").values_list("url", "status", "retry_at"))
|
|
bad_results = list(
|
|
ArchiveResult.objects.filter(
|
|
status__in=[
|
|
ArchiveResult.StatusChoices.FAILED,
|
|
ArchiveResult.StatusChoices.SKIPPED,
|
|
],
|
|
).values_list("plugin", "status", "output_str"),
|
|
)
|
|
if (
|
|
crawls
|
|
and snapshots
|
|
and all(status == Crawl.StatusChoices.SEALED for status, _retry_at in crawls)
|
|
and all(status == Snapshot.StatusChoices.SEALED for _url, status, _retry_at in snapshots)
|
|
and not bad_results
|
|
):
|
|
break
|
|
time.sleep(0.25)
|
|
|
|
os.kill(server2.pid, signal.SIGTERM)
|
|
server2.wait(timeout=20)
|
|
with use_archivebox_db(tmp_path):
|
|
crawls = list(Crawl.objects.order_by("created_at").values_list("status", "retry_at"))
|
|
snapshots = list(Snapshot.objects.order_by("created_at").values_list("url", "status", "retry_at"))
|
|
bad_results = list(
|
|
ArchiveResult.objects.filter(
|
|
status__in=[
|
|
ArchiveResult.StatusChoices.FAILED,
|
|
ArchiveResult.StatusChoices.SKIPPED,
|
|
],
|
|
).values_list("plugin", "status", "output_str"),
|
|
)
|
|
assert crawls
|
|
assert snapshots
|
|
assert all(status == Crawl.StatusChoices.SEALED for status, _retry_at in crawls)
|
|
assert all(status == Snapshot.StatusChoices.SEALED for _url, status, _retry_at in snapshots)
|
|
counter_runs = (marker_dir / "counter-runs.txt").read_text(encoding="utf-8").splitlines()
|
|
assert counter_runs
|
|
assert len(counter_runs) == len(set(counter_runs))
|
|
assert not (marker_dir / "counter-duplicates.txt").exists()
|
|
|
|
# The interrupted hook should be retried directly without rerunning the
|
|
# previous hook in the same plugin. That keeps plugin-level shell hooks
|
|
# idempotent across runner takeover instead of depending on each hook to
|
|
# detect partial prior work itself.
|
|
assert not bad_results
|
|
assert (marker_dir / "hook-finished").exists()
|
|
finally:
|
|
for proc in (add_proc, add_proc2, server, server2, server3):
|
|
if proc is not None and proc.poll() is None:
|
|
stop_archivebox_process(proc, signal.SIGTERM, timeout=10)
|
|
kill_processes_for_data_dir(tmp_path)
|
|
assert_no_processes_for_data_dir(tmp_path, timeout=12)
|
|
|
|
|
|
# Utility-level takeover selection tests.
|
|
|
|
|
|
def test_runtime_stack_owner_prefers_newer_server_over_older_update(tmp_path):
|
|
from archivebox.machine.models import Machine, Process
|
|
from archivebox.core.takeover_util import runtime_stack_owner
|
|
|
|
procs: list[subprocess.Popen[str]] = []
|
|
try:
|
|
for process_type in (Process.TypeChoices.UPDATE, Process.TypeChoices.SERVER):
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-c", "import time; time.sleep(60)"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
start_new_session=True,
|
|
)
|
|
procs.append(proc)
|
|
Process.objects.create(
|
|
machine=Machine.current(),
|
|
process_type=process_type,
|
|
worker_type=process_type,
|
|
pwd=str(tmp_path),
|
|
cmd=[],
|
|
pid=proc.pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
)
|
|
time.sleep(0.05)
|
|
|
|
owner = runtime_stack_owner(data_dir=tmp_path)
|
|
|
|
assert owner is not None
|
|
assert owner.process_type == Process.TypeChoices.SERVER
|
|
assert owner.pid == procs[-1].pid
|
|
finally:
|
|
for proc in procs:
|
|
if proc.poll() is None:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
for proc in procs:
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
proc.wait(timeout=5)
|
|
|
|
|
|
def test_runtime_stack_owner_keeps_server_over_newer_update(tmp_path):
|
|
from archivebox.machine.models import Machine, Process
|
|
from archivebox.core.takeover_util import runtime_stack_owner
|
|
|
|
procs: list[subprocess.Popen[str]] = []
|
|
try:
|
|
for process_type in (Process.TypeChoices.SERVER, Process.TypeChoices.UPDATE):
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-c", "import time; time.sleep(60)"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
start_new_session=True,
|
|
)
|
|
procs.append(proc)
|
|
Process.objects.create(
|
|
machine=Machine.current(),
|
|
process_type=process_type,
|
|
worker_type=process_type,
|
|
pwd=str(tmp_path),
|
|
cmd=[],
|
|
pid=proc.pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
)
|
|
time.sleep(0.05)
|
|
|
|
owner = runtime_stack_owner(data_dir=tmp_path)
|
|
|
|
assert owner is not None
|
|
assert owner.process_type == Process.TypeChoices.SERVER
|
|
assert owner.pid == procs[0].pid
|
|
finally:
|
|
for proc in procs:
|
|
if proc.poll() is None:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
for proc in procs:
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
proc.wait(timeout=5)
|
|
|
|
|
|
def test_foreground_runner_owner_prefers_newer_update_over_server(tmp_path):
|
|
from archivebox.machine.models import Machine, Process
|
|
from archivebox.core.takeover_util import foreground_runner_owner, runtime_stack_owner
|
|
|
|
procs: list[subprocess.Popen[str]] = []
|
|
try:
|
|
for process_type in (Process.TypeChoices.SERVER, Process.TypeChoices.UPDATE):
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-c", "import time; time.sleep(60)"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
start_new_session=True,
|
|
)
|
|
procs.append(proc)
|
|
Process.objects.create(
|
|
machine=Machine.current(),
|
|
process_type=process_type,
|
|
worker_type=process_type,
|
|
pwd=str(tmp_path),
|
|
cmd=[],
|
|
pid=proc.pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
)
|
|
time.sleep(0.05)
|
|
|
|
runtime_owner = runtime_stack_owner(data_dir=tmp_path)
|
|
runner_owner = foreground_runner_owner(data_dir=tmp_path)
|
|
|
|
assert runtime_owner is not None
|
|
assert runtime_owner.process_type == Process.TypeChoices.SERVER
|
|
assert runner_owner is not None
|
|
assert runner_owner.process_type == Process.TypeChoices.UPDATE
|
|
assert runner_owner.pid == procs[-1].pid
|
|
finally:
|
|
for proc in procs:
|
|
if proc.poll() is None:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
for proc in procs:
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
proc.wait(timeout=5)
|
|
|
|
|
|
def test_foreground_runner_owner_prefers_newer_server_over_update(tmp_path):
|
|
from archivebox.machine.models import Machine, Process
|
|
from archivebox.core.takeover_util import foreground_runner_owner, runtime_stack_owner
|
|
|
|
procs: list[subprocess.Popen[str]] = []
|
|
try:
|
|
for process_type in (Process.TypeChoices.UPDATE, Process.TypeChoices.SERVER):
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-c", "import time; time.sleep(60)"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
start_new_session=True,
|
|
)
|
|
procs.append(proc)
|
|
Process.objects.create(
|
|
machine=Machine.current(),
|
|
process_type=process_type,
|
|
worker_type=process_type,
|
|
pwd=str(tmp_path),
|
|
cmd=[],
|
|
pid=proc.pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
)
|
|
time.sleep(0.05)
|
|
|
|
runtime_owner = runtime_stack_owner(data_dir=tmp_path)
|
|
runner_owner = foreground_runner_owner(data_dir=tmp_path)
|
|
|
|
assert runtime_owner is not None
|
|
assert runtime_owner.process_type == Process.TypeChoices.SERVER
|
|
assert runner_owner is not None
|
|
assert runner_owner.process_type == Process.TypeChoices.SERVER
|
|
assert runner_owner.pid == procs[-1].pid
|
|
finally:
|
|
for proc in procs:
|
|
if proc.poll() is None:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
for proc in procs:
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
proc.wait(timeout=5)
|
|
|
|
|
|
def test_runtime_stack_owner_keeps_server_over_newer_supervised_runner(tmp_path):
|
|
from archivebox.machine.models import Machine, Process
|
|
from archivebox.core.takeover_util import RUNNER_ACTIVE_WORKER_TYPE, runtime_stack_owner
|
|
|
|
procs: list[subprocess.Popen[str]] = []
|
|
try:
|
|
for process_type, worker_type in (
|
|
(Process.TypeChoices.SERVER, ""),
|
|
(Process.TypeChoices.ORCHESTRATOR, RUNNER_ACTIVE_WORKER_TYPE),
|
|
):
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-c", "import time; time.sleep(60)"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
start_new_session=True,
|
|
)
|
|
procs.append(proc)
|
|
Process.objects.create(
|
|
machine=Machine.current(),
|
|
process_type=process_type,
|
|
worker_type=worker_type,
|
|
pwd=str(tmp_path),
|
|
cmd=[],
|
|
pid=proc.pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
)
|
|
time.sleep(0.05)
|
|
|
|
owner = runtime_stack_owner(data_dir=tmp_path)
|
|
|
|
assert owner is not None
|
|
assert owner.process_type == Process.TypeChoices.SERVER
|
|
assert owner.pid == procs[0].pid
|
|
finally:
|
|
for proc in procs:
|
|
if proc.poll() is None:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
for proc in procs:
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
proc.wait(timeout=5)
|
|
|
|
|
|
def test_runtime_stack_owner_reaps_dead_server_without_promoting_update(tmp_path):
|
|
from archivebox.machine.models import Machine, Process
|
|
from archivebox.core.takeover_util import runtime_stack_owner
|
|
|
|
procs: list[subprocess.Popen[str]] = []
|
|
older_row = None
|
|
newer_row = None
|
|
try:
|
|
for process_type in (Process.TypeChoices.UPDATE, Process.TypeChoices.SERVER):
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-c", "import time; time.sleep(60)"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
start_new_session=True,
|
|
)
|
|
procs.append(proc)
|
|
row = Process.objects.create(
|
|
machine=Machine.current(),
|
|
process_type=process_type,
|
|
worker_type=process_type,
|
|
pwd=str(tmp_path),
|
|
cmd=[],
|
|
pid=proc.pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
)
|
|
if process_type == Process.TypeChoices.UPDATE:
|
|
older_row = row
|
|
else:
|
|
newer_row = row
|
|
time.sleep(0.05)
|
|
|
|
assert older_row is not None
|
|
assert newer_row is not None
|
|
|
|
os.killpg(procs[-1].pid, signal.SIGTERM)
|
|
procs[-1].wait(timeout=5)
|
|
|
|
owner = runtime_stack_owner(data_dir=tmp_path)
|
|
|
|
assert owner is None
|
|
newer_row.refresh_from_db()
|
|
assert newer_row.status == Process.StatusChoices.EXITED
|
|
older_row.refresh_from_db()
|
|
assert older_row.status == Process.StatusChoices.RUNNING
|
|
finally:
|
|
for proc in procs:
|
|
if proc.poll() is None:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
for proc in procs:
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
proc.wait(timeout=5)
|
|
|
|
|
|
def test_runtime_stack_owner_ignores_supervised_orphan_runner(tmp_path):
|
|
from archivebox.machine.models import Machine, Process
|
|
from archivebox.core.takeover_util import RUNNER_ACTIVE_WORKER_TYPE, runtime_stack_owner
|
|
|
|
procs: list[subprocess.Popen[str]] = []
|
|
try:
|
|
for _ in range(2):
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-c", "import time; time.sleep(60)"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
start_new_session=True,
|
|
)
|
|
procs.append(proc)
|
|
|
|
supervisor_row = Process.objects.create(
|
|
machine=Machine.current(),
|
|
process_type=Process.TypeChoices.SUPERVISORD,
|
|
worker_type="supervisord",
|
|
pwd=str(tmp_path),
|
|
cmd=[],
|
|
pid=procs[0].pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
)
|
|
Process.objects.create(
|
|
machine=Machine.current(),
|
|
parent=supervisor_row,
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
worker_type=RUNNER_ACTIVE_WORKER_TYPE,
|
|
pwd=str(tmp_path),
|
|
cmd=[],
|
|
pid=procs[1].pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
)
|
|
|
|
assert runtime_stack_owner(data_dir=tmp_path) is None
|
|
finally:
|
|
for proc in procs:
|
|
if proc.poll() is None:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
for proc in procs:
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
proc.wait(timeout=5)
|
|
|
|
|
|
def test_runtime_stack_owner_allows_top_level_runner_when_no_parent_command_exists(tmp_path):
|
|
from archivebox.machine.models import Machine, Process
|
|
from archivebox.core.takeover_util import RUNNER_ACTIVE_WORKER_TYPE, runtime_stack_owner
|
|
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-c", "import time; time.sleep(60)"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
start_new_session=True,
|
|
)
|
|
try:
|
|
runner_row = Process.objects.create(
|
|
machine=Machine.current(),
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
worker_type=RUNNER_ACTIVE_WORKER_TYPE,
|
|
pwd=str(tmp_path),
|
|
cmd=[],
|
|
pid=proc.pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
)
|
|
|
|
owner = runtime_stack_owner(data_dir=tmp_path)
|
|
|
|
assert owner is not None
|
|
assert owner.id == runner_row.id
|
|
finally:
|
|
if proc.poll() is None:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
proc.wait(timeout=5)
|