media tests
Continuous Integration / e2e-tests (push) Successful in 5m22s
Continuous Integration / backend-tests (push) Successful in 38s
Continuous Integration / frontend-check (push) Successful in 16s

This commit is contained in:
2026-05-05 18:48:47 -04:00
parent d77a79876f
commit 1ef2c194db
+498
View File
@@ -1,4 +1,5 @@
from app.db import models
from app.services.archiver import archiver_manager
from datetime import datetime, timezone
import json
@@ -701,3 +702,500 @@ def test_metadata_directory(client, db_session):
assert data["type"] == "directory"
assert data["child_count"] == 2
assert data["size"] == 300
# ── List Providers ──
def test_list_providers_includes_mock_in_test_mode(client, monkeypatch):
"""Tests that MockLTOProvider is included when TAPEHOARD_TEST_MODE is set."""
monkeypatch.setenv("TAPEHOARD_TEST_MODE", "true")
response = client.get("/inventory/providers")
assert response.status_code == 200
data = response.json()
provider_ids = [p["provider_id"] for p in data]
assert "lto_tape" in provider_ids
assert "local_hdd" in provider_ids
assert "s3_compat" in provider_ids
assert "mock_lto" in provider_ids
# ── List Media with Provider State ──
def test_list_media_with_refresh(client, db_session, mocker):
"""Tests listing media with refresh=True queries hardware status."""
media = models.StorageMedia(
media_type="local_hdd",
identifier="DISK_ONLINE",
capacity=1000,
status="active",
extra_config='{"mount_path": "/tmp"}',
)
db_session.add(media)
db_session.commit()
mock_provider = mocker.MagicMock()
mock_provider.check_online.return_value = True
mock_provider.identify_media.return_value = "DISK_ONLINE"
mock_provider.get_live_info.return_value = {"online": True}
mock_provider.mount_base = "/tmp"
mocker.patch.object(
archiver_manager,
"_get_storage_provider",
return_value=mock_provider,
)
response = client.get("/inventory/media?refresh=true")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["is_online"] is True
assert data[0]["is_identified"] is True
# ── Create Media Validation ──
def test_create_media_duplicate_identifier(client, db_session):
"""Tests creating media with duplicate identifier returns 400."""
db_session.add(
models.StorageMedia(
media_type="hdd", identifier="DUPE", capacity=1000, status="active"
)
)
db_session.commit()
response = client.post(
"/inventory/media",
json={"media_type": "local_hdd", "identifier": "DUPE", "capacity": 1000},
)
assert response.status_code == 400
assert "already exists" in response.json()["detail"]
# ── Update Media Edge Cases ──
def test_update_media_not_found(client):
"""Tests updating non-existent media returns 404."""
response = client.patch("/inventory/media/99999", json={"location": "Nowhere"})
assert response.status_code == 404
def test_update_status_to_failed_purges_versions(client, db_session):
"""Setting status to FAILED should delete all file_versions."""
media = models.StorageMedia(
media_type="hdd", identifier="DISK_FAIL_001", capacity=1000, status="active"
)
db_session.add(media)
db_session.flush()
file1 = models.FilesystemState(file_path="data/file1.txt", size=100, mtime=1000)
db_session.add(file1)
db_session.flush()
db_session.add(
models.FileVersion(
filesystem_state_id=file1.id,
media_id=media.id,
file_number="1",
offset_start=0,
offset_end=100,
)
)
db_session.commit()
response = client.patch(
f"/inventory/media/{media.id}",
json={"status": "FAILED"},
)
assert response.status_code == 200
from sqlalchemy import text
result = db_session.execute(
text("SELECT COUNT(*) FROM file_versions WHERE media_id = :media_id"),
{"media_id": media.id},
).scalar()
assert result == 0
def test_update_media_all_lto_fields(client, db_session):
"""Tests updating all LTO-specific fields."""
media = models.StorageMedia(
media_type="lto_tape",
identifier="LTO_PATCH",
capacity=1000,
status="active",
)
db_session.add(media)
db_session.commit()
response = client.patch(
f"/inventory/media/{media.id}",
json={
"generation": "LTO-9",
"worm": True,
"write_protected": True,
"compression": False,
"encryption_key_id": "new-key",
"cleaning_cartridge": True,
},
)
assert response.status_code == 200
data = response.json()
assert data["generation"] == "LTO-9"
assert data["worm"] is True
assert data["write_protected"] is True
assert data["compression"] is False
assert data["encryption_key_id"] == "new-key"
assert data["cleaning_cartridge"] is True
def test_update_media_all_hdd_fields(client, db_session):
"""Tests updating all HDD-specific fields."""
media = models.StorageMedia(
media_type="local_hdd",
identifier="HDD_PATCH",
capacity=1000,
status="active",
)
db_session.add(media)
db_session.commit()
response = client.patch(
f"/inventory/media/{media.id}",
json={
"drive_model": "WD-Red",
"device_uuid": "uuid-123",
"is_ssd": True,
"mount_path": "/mnt/backup",
"filesystem_type": "ext4",
"connection_interface": "USB3",
"encrypted": True,
},
)
assert response.status_code == 200
data = response.json()
assert data["drive_model"] == "WD-Red"
assert data["device_uuid"] == "uuid-123"
assert data["is_ssd"] is True
assert data["mount_path"] == "/mnt/backup"
assert data["filesystem_type"] == "ext4"
assert data["connection_interface"] == "USB3"
assert data["encrypted"] is True
def test_update_media_all_cloud_fields(client, db_session):
"""Tests updating all cloud-specific fields."""
media = models.StorageMedia(
media_type="s3_compat",
identifier="CLOUD_PATCH",
capacity=1000,
status="active",
)
db_session.add(media)
db_session.commit()
response = client.patch(
f"/inventory/media/{media.id}",
json={
"provider_template": "wasabi",
"endpoint_url": "https://s3.wasabisys.com",
"region": "us-east-1",
"bucket_name": "my-bucket",
"access_key_id": "AKIA...",
"secret_access_key_name": "wasabi-key",
"path_style_access": False,
"storage_class": "STANDARD",
"max_part_size_mb": 1000,
"obfuscate_filenames": True,
"encryption_secret_name": "enc-secret",
},
)
assert response.status_code == 200
data = response.json()
assert data["provider_template"] == "wasabi"
assert data["endpoint_url"] == "https://s3.wasabisys.com"
assert data["region"] == "us-east-1"
assert data["bucket_name"] == "my-bucket"
assert data["access_key_id"] == "AKIA..."
assert data["secret_access_key_name"] == "wasabi-key"
assert data["path_style_access"] is False
assert data["storage_class"] == "STANDARD"
assert data["max_part_size_mb"] == 1000
assert data["obfuscate_filenames"] is True
assert data["encryption_secret_name"] == "enc-secret"
def test_update_media_legacy_extra_config_migration(client, db_session):
"""Tests that legacy extra_config keys are migrated to first-class columns."""
media = models.StorageMedia(
media_type="local_hdd",
identifier="LEGACY_001",
capacity=1000,
status="active",
extra_config=json.dumps(
{
"device_path": "/mnt/legacy",
"encryption_key": "legacy-key",
"encryption_passphrase": "legacy-pass",
}
),
)
db_session.add(media)
db_session.commit()
response = client.patch(
f"/inventory/media/{media.id}",
json={"location": "Migrated"},
)
assert response.status_code == 200
data = response.json()
assert data["mount_path"] == "/mnt/legacy"
assert data["encryption_key_id"] == "legacy-key"
# ── Delete Media ──
def test_delete_media_not_found(client):
"""Tests deleting non-existent media returns 404."""
response = client.delete("/inventory/media/99999")
assert response.status_code == 404
# ── Initialize Media ──
def test_initialize_media_not_found(client):
"""Tests initializing non-existent media returns 404."""
response = client.post("/inventory/media/99999/initialize")
assert response.status_code == 404
def test_initialize_media_no_provider(client, db_session, mocker):
"""Tests initializing media with unsupported type returns 400."""
media = models.StorageMedia(
media_type="hdd", identifier="NO_PROV", capacity=1000, status="active"
)
db_session.add(media)
db_session.commit()
mocker.patch.object(
archiver_manager,
"_get_storage_provider",
return_value=None,
)
response = client.post(f"/inventory/media/{media.id}/initialize")
assert response.status_code == 400
assert "provider not found" in response.json()["detail"]
def test_initialize_media_existing_data_blocks(client, db_session, mocker):
"""Tests initialize blocks when existing data found and force=False."""
media = models.StorageMedia(
media_type="hdd", identifier="HAS_DATA", capacity=1000, status="active"
)
db_session.add(media)
db_session.commit()
mock_provider = mocker.MagicMock()
mock_provider.check_existing_data.return_value = True
mocker.patch.object(
archiver_manager,
"_get_storage_provider",
return_value=mock_provider,
)
response = client.post(f"/inventory/media/{media.id}/initialize")
assert response.status_code == 409
assert "existing data" in response.json()["detail"]
def test_initialize_media_force_overwrite(client, db_session, mocker):
"""Tests initialize with force=True overwrites existing data."""
media = models.StorageMedia(
media_type="hdd",
identifier="FORCE_INIT",
capacity=1000,
status="active",
extra_config='{"mount_path": "/tmp"}',
)
db_session.add(media)
db_session.commit()
mock_provider = mocker.MagicMock()
mock_provider.check_existing_data.return_value = True
mock_provider.initialize_media.return_value = True
mock_provider.device_path = "/tmp/init"
mocker.patch.object(
archiver_manager,
"_get_storage_provider",
return_value=mock_provider,
)
response = client.post(f"/inventory/media/{media.id}/initialize?force=true")
assert response.status_code == 200
assert "complete" in response.json()["message"]
def test_initialize_media_permission_error(client, db_session, mocker):
"""Tests initialize handles PermissionError."""
media = models.StorageMedia(
media_type="hdd", identifier="PERM_DENY", capacity=1000, status="active"
)
db_session.add(media)
db_session.commit()
mock_provider = mocker.MagicMock()
mock_provider.check_existing_data.return_value = False
mock_provider.initialize_media.side_effect = PermissionError("Access denied")
mocker.patch.object(
archiver_manager,
"_get_storage_provider",
return_value=mock_provider,
)
response = client.post(f"/inventory/media/{media.id}/initialize")
assert response.status_code == 403
assert "Access denied" in response.json()["detail"]
# ── Reorder Media ──
def test_reorder_media(client, db_session):
"""Tests reordering media priority."""
m1 = models.StorageMedia(
media_type="hdd", identifier="A", capacity=1000, status="active"
)
m2 = models.StorageMedia(
media_type="hdd", identifier="B", capacity=1000, status="active"
)
db_session.add_all([m1, m2])
db_session.commit()
response = client.post(
"/inventory/media/reorder", json={"media_ids": [m2.id, m1.id]}
)
assert response.status_code == 200
db_session.expire_all()
assert db_session.get(models.StorageMedia, m2.id).priority_index == 0
assert db_session.get(models.StorageMedia, m1.id).priority_index == 1
# ── Insights Deep Tests ──
def test_insights_with_duplicates_and_aging(client, db_session):
"""Tests insights reports duplicates, aging, redundancy, and extensions."""
now = datetime.now(timezone.utc).timestamp()
# Two files with same hash (duplicate)
f1 = models.FilesystemState(
file_path="/data/a.txt", size=100, mtime=now, sha256_hash="duphash"
)
f2 = models.FilesystemState(
file_path="/data/b.txt", size=100, mtime=now, sha256_hash="duphash"
)
# Protected file
f3 = models.FilesystemState(
file_path="/data/c.txt",
size=200,
mtime=now - 400 * 24 * 3600,
sha256_hash="hash3",
)
db_session.add_all([f1, f2, f3])
db_session.flush()
media = models.StorageMedia(
media_type="hdd", identifier="M1", capacity=1000, status="active"
)
db_session.add(media)
db_session.flush()
db_session.add(
models.FileVersion(
filesystem_state_id=f3.id,
media_id=media.id,
file_number="1",
offset_start=0,
offset_end=200,
)
)
db_session.commit()
response = client.get("/inventory/insights")
assert response.status_code == 200
data = response.json()
assert data["summary"]["total_files"] == 3
assert data["summary"]["total_bytes"] == 400
assert data["summary"]["protected_bytes"] == 200
assert data["summary"]["vulnerable_bytes"] == 200
assert len(data["duplicates"]) == 1
assert data["duplicates"][0]["copies"] == 2
assert data["duplicates"][0]["saved"] == 100
assert len(data["extensions"]) >= 1
assert any(e["ext"] == "txt" for e in data["extensions"])
assert len(data["redundancy"]) >= 1
# ── Treemap / Directories ──
def test_get_treemap(client, db_session):
"""Tests the treemap endpoint returns hierarchical directory data."""
f1 = models.FilesystemState(file_path="/data/sub/file1.txt", size=100, mtime=1000)
f2 = models.FilesystemState(file_path="/data/sub/file2.txt", size=200, mtime=1000)
db_session.add_all([f1, f2])
db_session.commit()
response = client.get("/inventory/directories")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
# Should contain data directory
assert len(data) >= 1
# ── Detect Media ──
def test_detect_media_finds_new_insertion(client, db_session, mocker):
"""Tests detect_media finds newly inserted unregistered media."""
media = models.StorageMedia(
media_type="lto_tape",
identifier="EXISTING_TAPE",
capacity=1000,
status="active",
extra_config=json.dumps({"device_path": "/dev/nst0"}),
)
db_session.add(media)
db_session.commit()
mock_provider = mocker.MagicMock()
mock_provider.provider_id = "lto_tape"
mock_provider.check_online.return_value = True
mock_provider.get_live_info.return_value = {"identity": "NEW_TAPE_01"}
mocker.patch.object(
archiver_manager,
"_get_storage_provider",
return_value=mock_provider,
)
response = client.get("/inventory/detect")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["identifier"] == "NEW_TAPE_01"
assert data[0]["device_path"] == "/dev/nst0"