Files
2026-06-14 11:44:38 -07:00

260 lines
10 KiB
Python

import json
import os
import shutil
import uuid
import asyncio
from pathlib import Path
import pytest
from archivebox.machine.models import Binary, Machine, Process
from archivebox.tests.conftest import parse_jsonl_output, run_archivebox_cmd
from archivebox.tests.test_orm_helpers import use_archivebox_db
pytestmark = pytest.mark.django_db(transaction=True)
def _link_real_binary(bin_dir: Path, name: str, *, source: str | None = None) -> Path:
bin_dir.mkdir(parents=True, exist_ok=True)
source_path = shutil.which(source or name)
assert source_path, f"{source or name} must be installed for this integration test"
link = bin_dir / name
link.unlink(missing_ok=True)
link.symlink_to(source_path)
return link
def _runtime_env(data_dir: Path, bin_dir: Path, *, lib_dir: Path | None = None) -> dict[str, str]:
archivebox_bin = shutil.which("archivebox")
assert archivebox_bin, "archivebox console script must be available for CLI tests"
lib_dir = lib_dir or data_dir / "lib"
return {
"ABXPKG_LIB_DIR": str(lib_dir),
"PATH": os.pathsep.join([str(bin_dir), str(Path(archivebox_bin).parent), "/usr/bin", "/bin", "/usr/sbin", "/sbin"]),
}
def test_binary_request_preserves_raw_overrides_in_db_while_using_native_event():
from abxpkg.binary_service import BinaryCacheService, BinaryEvent, BinaryRequestEvent, BinaryService
from abx_dl.orchestrator import create_bus
from archivebox.services.binary_service import ArchiveBoxDBBinaryCacheBackend
machine = Machine.current()
raw_overrides = {
"pip": {
"install_args": ["imagesize>=2.0.0"],
"module_name": "imagesize",
},
}
binary = Binary.objects.create(
machine=machine,
name="sh",
abspath="/bin/sh",
version="1.0.0",
binprovider="env",
binproviders="env,pip",
overrides=raw_overrides,
status=Binary.StatusChoices.INSTALLED,
)
bus = create_bus(name=f"test_binary_raw_overrides_{uuid.uuid4().hex[:8]}")
BinaryCacheService(bus, backend=ArchiveBoxDBBinaryCacheBackend())
BinaryService(bus)
binary_events: list[BinaryEvent] = []
async def on_BinaryEvent(event: BinaryEvent) -> None:
binary_events.append(event)
bus.on(BinaryEvent, on_BinaryEvent)
async def run_event() -> None:
await bus.emit(
BinaryRequestEvent(
name="sh",
binproviders="env,pip",
overrides={"pip": {"install_args": ["imagesize>=2.0.0"]}},
extra_context={
"raw_overrides": raw_overrides,
"provider_metadata": {"pip": {"module_name": "imagesize"}},
},
),
).now()
await bus.wait_until_idle()
asyncio.run(run_event())
binary.refresh_from_db()
assert binary.status == Binary.StatusChoices.INSTALLED
assert binary.overrides == raw_overrides
assert binary_events
assert binary_events[-1].overrides == {"pip": {"install_args": ["imagesize>=2.0.0"]}}
assert binary_events[-1].extra_context["raw_overrides"] == raw_overrides
def test_binary_request_installs_env_binary_and_recovers_stale_cache(initialized_archive, tmp_path):
name = f"abx-e2e-rg-{uuid.uuid4().hex[:8]}"
bootstrap_bin_dir = tmp_path / "realbin"
provider_bin_dir = initialized_archive / "lib" / "env" / "bin"
_link_real_binary(bootstrap_bin_dir, "uv")
_link_real_binary(provider_bin_dir, name, source="rg")
_cmd_result = run_archivebox_cmd(
["run"],
cwd=initialized_archive,
stdin=json.dumps({"type": "BinaryRequest", "name": name, "binproviders": "env"}) + "\n",
timeout=120,
env=_runtime_env(initialized_archive, bootstrap_bin_dir),
default_cli_env=True,
disable_extractors=True,
)
stdout, stderr, returncode = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
assert returncode == 0, stderr
output_records = parse_jsonl_output(stdout)
assert any(record["type"] == "BinaryRequest" and record["name"] == name for record in output_records)
with use_archivebox_db(initialized_archive):
binary = Binary.objects.get(name=name)
machine_id = str(binary.machine_id)
first_binary_id = str(binary.id)
first_abspath = Path(binary.abspath)
binary_processes = list(Process.objects.filter(process_type=Process.TypeChoices.BINARY).order_by("created_at"))
assert binary.status == Binary.StatusChoices.INSTALLED
assert binary.version
assert binary.binprovider == "env"
assert binary.binproviders == "env"
assert first_abspath.exists()
assert first_abspath == provider_bin_dir / name
assert first_abspath.resolve() == Path(shutil.which("rg") or "").resolve()
assert first_abspath.is_relative_to(initialized_archive / "lib")
assert (initialized_archive / "lib" / "env" / "bin" / name).exists()
assert (initialized_archive / "machines" / machine_id / "binaries" / name / "index.jsonl").exists()
assert binary_processes
assert binary_processes[-1].status == Process.StatusChoices.EXITED
assert binary_processes[-1].exit_code == 0
assert binary_processes[-1].ended_at is not None
assert binary_processes[-1].started_at < binary_processes[-1].ended_at
assert any(f"--name={name}" in arg for arg in binary_processes[-1].cmd)
_cmd_result = run_archivebox_cmd(
["version"],
cwd=initialized_archive,
timeout=60,
env=_runtime_env(initialized_archive, bootstrap_bin_dir),
default_cli_env=True,
disable_extractors=True,
)
version_stdout, version_stderr, version_code = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
assert version_code == 0, version_stderr
assert name in version_stdout
assert binary.version in version_stdout
first_abspath.unlink()
_link_real_binary(bootstrap_bin_dir, name, source="rg")
_cmd_result = run_archivebox_cmd(
["run", f"--binary-id={first_binary_id}"],
cwd=initialized_archive,
timeout=120,
env=_runtime_env(initialized_archive, bootstrap_bin_dir),
default_cli_env=True,
disable_extractors=True,
)
rerun_stdout, rerun_stderr, rerun_code = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
assert rerun_code == 0, rerun_stdout + rerun_stderr
with use_archivebox_db(initialized_archive):
recovered = Binary.objects.get(pk=first_binary_id)
process_count = Process.objects.filter(process_type=Process.TypeChoices.BINARY).count()
assert recovered.status == Binary.StatusChoices.INSTALLED
assert recovered.version == binary.version
assert Path(recovered.abspath).exists()
assert Path(recovered.abspath).resolve() == Path(shutil.which("rg") or "").resolve()
assert process_count >= 2
changed_lib_dir = tmp_path / "changed-lib"
changed_provider_bin_dir = changed_lib_dir / "env" / "bin"
_link_real_binary(changed_provider_bin_dir, name, source="rg")
_cmd_result = run_archivebox_cmd(
["run"],
cwd=initialized_archive,
stdin=json.dumps({"type": "BinaryRequest", "name": name, "binproviders": "env"}) + "\n",
timeout=120,
env=_runtime_env(initialized_archive, bootstrap_bin_dir, lib_dir=changed_lib_dir),
default_cli_env=True,
disable_extractors=True,
)
relib_stdout, relib_stderr, relib_code = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
assert relib_code == 0, relib_stdout + relib_stderr
with use_archivebox_db(initialized_archive):
relibbed = Binary.objects.get(pk=first_binary_id)
assert relibbed.status == Binary.StatusChoices.INSTALLED
assert relibbed.version == binary.version
assert Path(relibbed.abspath) == changed_provider_bin_dir / name
assert Path(relibbed.abspath).exists()
assert Path(relibbed.abspath).resolve() == Path(shutil.which("rg") or "").resolve()
def test_missing_binary_request_stays_queued_then_recovers_when_provider_can_resolve(initialized_archive, tmp_path):
name = f"abx-missing-rg-{uuid.uuid4().hex[:8]}"
bootstrap_bin_dir = tmp_path / "realbin"
provider_bin_dir = initialized_archive / "lib" / "env" / "bin"
_link_real_binary(bootstrap_bin_dir, "uv")
_cmd_result = run_archivebox_cmd(
["run"],
cwd=initialized_archive,
stdin=json.dumps({"type": "BinaryRequest", "name": name, "binproviders": "env"}) + "\n",
timeout=120,
env=_runtime_env(initialized_archive, bootstrap_bin_dir),
default_cli_env=True,
disable_extractors=True,
)
stdout, stderr, returncode = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
assert returncode == 0, stderr
assert any(record["type"] == "BinaryRequest" and record["name"] == name for record in parse_jsonl_output(stdout))
with use_archivebox_db(initialized_archive):
queued = Binary.objects.get(name=name)
queued_id = str(queued.id)
failed_process = Process.objects.filter(process_type=Process.TypeChoices.BINARY).latest("created_at")
machine_config = Machine.objects.get(pk=queued.machine_id).config or {}
assert queued.status == Binary.StatusChoices.QUEUED
assert queued.abspath == ""
assert queued.retry_at is not None
assert failed_process.status == Process.StatusChoices.EXITED
assert failed_process.exit_code == 1
assert f"{name.upper().replace('-', '_')}_BINARY" not in machine_config
assert not (provider_bin_dir / name).exists()
_link_real_binary(provider_bin_dir, name, source="rg")
_cmd_result = run_archivebox_cmd(
["run", f"--binary-id={queued_id}"],
cwd=initialized_archive,
timeout=120,
env=_runtime_env(initialized_archive, bootstrap_bin_dir),
default_cli_env=True,
disable_extractors=True,
)
recover_stdout, recover_stderr, recover_code = _cmd_result.stdout, _cmd_result.stderr, _cmd_result.returncode
assert recover_code == 0, recover_stdout + recover_stderr
with use_archivebox_db(initialized_archive):
recovered = Binary.objects.get(pk=queued_id)
process_exit_codes = list(
Process.objects.filter(process_type=Process.TypeChoices.BINARY).order_by("created_at").values_list("exit_code", flat=True),
)
assert recovered.status == Binary.StatusChoices.INSTALLED
assert recovered.version
assert Path(recovered.abspath).exists()
assert Path(recovered.abspath) == provider_bin_dir / name
assert process_exit_codes[-2:] == [1, 0]