diff --git a/backend/tests/test_api_inventory.py b/backend/tests/test_api_inventory.py index 4b8d6be..da4c05a 100644 --- a/backend/tests/test_api_inventory.py +++ b/backend/tests/test_api_inventory.py @@ -1,4 +1,5 @@ from app.db import models +from app.services.archiver import archiver_manager from datetime import datetime, timezone import json @@ -701,3 +702,500 @@ def test_metadata_directory(client, db_session): assert data["type"] == "directory" assert data["child_count"] == 2 assert data["size"] == 300 + + +# ── List Providers ── + + +def test_list_providers_includes_mock_in_test_mode(client, monkeypatch): + """Tests that MockLTOProvider is included when TAPEHOARD_TEST_MODE is set.""" + monkeypatch.setenv("TAPEHOARD_TEST_MODE", "true") + response = client.get("/inventory/providers") + assert response.status_code == 200 + data = response.json() + provider_ids = [p["provider_id"] for p in data] + assert "lto_tape" in provider_ids + assert "local_hdd" in provider_ids + assert "s3_compat" in provider_ids + assert "mock_lto" in provider_ids + + +# ── List Media with Provider State ── + + +def test_list_media_with_refresh(client, db_session, mocker): + """Tests listing media with refresh=True queries hardware status.""" + media = models.StorageMedia( + media_type="local_hdd", + identifier="DISK_ONLINE", + capacity=1000, + status="active", + extra_config='{"mount_path": "/tmp"}', + ) + db_session.add(media) + db_session.commit() + + mock_provider = mocker.MagicMock() + mock_provider.check_online.return_value = True + mock_provider.identify_media.return_value = "DISK_ONLINE" + mock_provider.get_live_info.return_value = {"online": True} + mock_provider.mount_base = "/tmp" + + mocker.patch.object( + archiver_manager, + "_get_storage_provider", + return_value=mock_provider, + ) + + response = client.get("/inventory/media?refresh=true") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + assert data[0]["is_online"] is True + assert data[0]["is_identified"] is True + + +# ── Create Media Validation ── + + +def test_create_media_duplicate_identifier(client, db_session): + """Tests creating media with duplicate identifier returns 400.""" + db_session.add( + models.StorageMedia( + media_type="hdd", identifier="DUPE", capacity=1000, status="active" + ) + ) + db_session.commit() + + response = client.post( + "/inventory/media", + json={"media_type": "local_hdd", "identifier": "DUPE", "capacity": 1000}, + ) + assert response.status_code == 400 + assert "already exists" in response.json()["detail"] + + +# ── Update Media Edge Cases ── + + +def test_update_media_not_found(client): + """Tests updating non-existent media returns 404.""" + response = client.patch("/inventory/media/99999", json={"location": "Nowhere"}) + assert response.status_code == 404 + + +def test_update_status_to_failed_purges_versions(client, db_session): + """Setting status to FAILED should delete all file_versions.""" + media = models.StorageMedia( + media_type="hdd", identifier="DISK_FAIL_001", capacity=1000, status="active" + ) + db_session.add(media) + db_session.flush() + + file1 = models.FilesystemState(file_path="data/file1.txt", size=100, mtime=1000) + db_session.add(file1) + db_session.flush() + + db_session.add( + models.FileVersion( + filesystem_state_id=file1.id, + media_id=media.id, + file_number="1", + offset_start=0, + offset_end=100, + ) + ) + db_session.commit() + + response = client.patch( + f"/inventory/media/{media.id}", + json={"status": "FAILED"}, + ) + assert response.status_code == 200 + + from sqlalchemy import text + + result = db_session.execute( + text("SELECT COUNT(*) FROM file_versions WHERE media_id = :media_id"), + {"media_id": media.id}, + ).scalar() + assert result == 0 + + +def test_update_media_all_lto_fields(client, db_session): + """Tests updating all LTO-specific fields.""" + media = models.StorageMedia( + media_type="lto_tape", + identifier="LTO_PATCH", + capacity=1000, + status="active", + ) + db_session.add(media) + db_session.commit() + + response = client.patch( + f"/inventory/media/{media.id}", + json={ + "generation": "LTO-9", + "worm": True, + "write_protected": True, + "compression": False, + "encryption_key_id": "new-key", + "cleaning_cartridge": True, + }, + ) + assert response.status_code == 200 + data = response.json() + assert data["generation"] == "LTO-9" + assert data["worm"] is True + assert data["write_protected"] is True + assert data["compression"] is False + assert data["encryption_key_id"] == "new-key" + assert data["cleaning_cartridge"] is True + + +def test_update_media_all_hdd_fields(client, db_session): + """Tests updating all HDD-specific fields.""" + media = models.StorageMedia( + media_type="local_hdd", + identifier="HDD_PATCH", + capacity=1000, + status="active", + ) + db_session.add(media) + db_session.commit() + + response = client.patch( + f"/inventory/media/{media.id}", + json={ + "drive_model": "WD-Red", + "device_uuid": "uuid-123", + "is_ssd": True, + "mount_path": "/mnt/backup", + "filesystem_type": "ext4", + "connection_interface": "USB3", + "encrypted": True, + }, + ) + assert response.status_code == 200 + data = response.json() + assert data["drive_model"] == "WD-Red" + assert data["device_uuid"] == "uuid-123" + assert data["is_ssd"] is True + assert data["mount_path"] == "/mnt/backup" + assert data["filesystem_type"] == "ext4" + assert data["connection_interface"] == "USB3" + assert data["encrypted"] is True + + +def test_update_media_all_cloud_fields(client, db_session): + """Tests updating all cloud-specific fields.""" + media = models.StorageMedia( + media_type="s3_compat", + identifier="CLOUD_PATCH", + capacity=1000, + status="active", + ) + db_session.add(media) + db_session.commit() + + response = client.patch( + f"/inventory/media/{media.id}", + json={ + "provider_template": "wasabi", + "endpoint_url": "https://s3.wasabisys.com", + "region": "us-east-1", + "bucket_name": "my-bucket", + "access_key_id": "AKIA...", + "secret_access_key_name": "wasabi-key", + "path_style_access": False, + "storage_class": "STANDARD", + "max_part_size_mb": 1000, + "obfuscate_filenames": True, + "encryption_secret_name": "enc-secret", + }, + ) + assert response.status_code == 200 + data = response.json() + assert data["provider_template"] == "wasabi" + assert data["endpoint_url"] == "https://s3.wasabisys.com" + assert data["region"] == "us-east-1" + assert data["bucket_name"] == "my-bucket" + assert data["access_key_id"] == "AKIA..." + assert data["secret_access_key_name"] == "wasabi-key" + assert data["path_style_access"] is False + assert data["storage_class"] == "STANDARD" + assert data["max_part_size_mb"] == 1000 + assert data["obfuscate_filenames"] is True + assert data["encryption_secret_name"] == "enc-secret" + + +def test_update_media_legacy_extra_config_migration(client, db_session): + """Tests that legacy extra_config keys are migrated to first-class columns.""" + media = models.StorageMedia( + media_type="local_hdd", + identifier="LEGACY_001", + capacity=1000, + status="active", + extra_config=json.dumps( + { + "device_path": "/mnt/legacy", + "encryption_key": "legacy-key", + "encryption_passphrase": "legacy-pass", + } + ), + ) + db_session.add(media) + db_session.commit() + + response = client.patch( + f"/inventory/media/{media.id}", + json={"location": "Migrated"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["mount_path"] == "/mnt/legacy" + assert data["encryption_key_id"] == "legacy-key" + + +# ── Delete Media ── + + +def test_delete_media_not_found(client): + """Tests deleting non-existent media returns 404.""" + response = client.delete("/inventory/media/99999") + assert response.status_code == 404 + + +# ── Initialize Media ── + + +def test_initialize_media_not_found(client): + """Tests initializing non-existent media returns 404.""" + response = client.post("/inventory/media/99999/initialize") + assert response.status_code == 404 + + +def test_initialize_media_no_provider(client, db_session, mocker): + """Tests initializing media with unsupported type returns 400.""" + media = models.StorageMedia( + media_type="hdd", identifier="NO_PROV", capacity=1000, status="active" + ) + db_session.add(media) + db_session.commit() + + mocker.patch.object( + archiver_manager, + "_get_storage_provider", + return_value=None, + ) + + response = client.post(f"/inventory/media/{media.id}/initialize") + assert response.status_code == 400 + assert "provider not found" in response.json()["detail"] + + +def test_initialize_media_existing_data_blocks(client, db_session, mocker): + """Tests initialize blocks when existing data found and force=False.""" + media = models.StorageMedia( + media_type="hdd", identifier="HAS_DATA", capacity=1000, status="active" + ) + db_session.add(media) + db_session.commit() + + mock_provider = mocker.MagicMock() + mock_provider.check_existing_data.return_value = True + mocker.patch.object( + archiver_manager, + "_get_storage_provider", + return_value=mock_provider, + ) + + response = client.post(f"/inventory/media/{media.id}/initialize") + assert response.status_code == 409 + assert "existing data" in response.json()["detail"] + + +def test_initialize_media_force_overwrite(client, db_session, mocker): + """Tests initialize with force=True overwrites existing data.""" + media = models.StorageMedia( + media_type="hdd", + identifier="FORCE_INIT", + capacity=1000, + status="active", + extra_config='{"mount_path": "/tmp"}', + ) + db_session.add(media) + db_session.commit() + + mock_provider = mocker.MagicMock() + mock_provider.check_existing_data.return_value = True + mock_provider.initialize_media.return_value = True + mock_provider.device_path = "/tmp/init" + mocker.patch.object( + archiver_manager, + "_get_storage_provider", + return_value=mock_provider, + ) + + response = client.post(f"/inventory/media/{media.id}/initialize?force=true") + assert response.status_code == 200 + assert "complete" in response.json()["message"] + + +def test_initialize_media_permission_error(client, db_session, mocker): + """Tests initialize handles PermissionError.""" + media = models.StorageMedia( + media_type="hdd", identifier="PERM_DENY", capacity=1000, status="active" + ) + db_session.add(media) + db_session.commit() + + mock_provider = mocker.MagicMock() + mock_provider.check_existing_data.return_value = False + mock_provider.initialize_media.side_effect = PermissionError("Access denied") + mocker.patch.object( + archiver_manager, + "_get_storage_provider", + return_value=mock_provider, + ) + + response = client.post(f"/inventory/media/{media.id}/initialize") + assert response.status_code == 403 + assert "Access denied" in response.json()["detail"] + + +# ── Reorder Media ── + + +def test_reorder_media(client, db_session): + """Tests reordering media priority.""" + m1 = models.StorageMedia( + media_type="hdd", identifier="A", capacity=1000, status="active" + ) + m2 = models.StorageMedia( + media_type="hdd", identifier="B", capacity=1000, status="active" + ) + db_session.add_all([m1, m2]) + db_session.commit() + + response = client.post( + "/inventory/media/reorder", json={"media_ids": [m2.id, m1.id]} + ) + assert response.status_code == 200 + + db_session.expire_all() + assert db_session.get(models.StorageMedia, m2.id).priority_index == 0 + assert db_session.get(models.StorageMedia, m1.id).priority_index == 1 + + +# ── Insights Deep Tests ── + + +def test_insights_with_duplicates_and_aging(client, db_session): + """Tests insights reports duplicates, aging, redundancy, and extensions.""" + now = datetime.now(timezone.utc).timestamp() + + # Two files with same hash (duplicate) + f1 = models.FilesystemState( + file_path="/data/a.txt", size=100, mtime=now, sha256_hash="duphash" + ) + f2 = models.FilesystemState( + file_path="/data/b.txt", size=100, mtime=now, sha256_hash="duphash" + ) + # Protected file + f3 = models.FilesystemState( + file_path="/data/c.txt", + size=200, + mtime=now - 400 * 24 * 3600, + sha256_hash="hash3", + ) + db_session.add_all([f1, f2, f3]) + db_session.flush() + + media = models.StorageMedia( + media_type="hdd", identifier="M1", capacity=1000, status="active" + ) + db_session.add(media) + db_session.flush() + + db_session.add( + models.FileVersion( + filesystem_state_id=f3.id, + media_id=media.id, + file_number="1", + offset_start=0, + offset_end=200, + ) + ) + db_session.commit() + + response = client.get("/inventory/insights") + assert response.status_code == 200 + data = response.json() + + assert data["summary"]["total_files"] == 3 + assert data["summary"]["total_bytes"] == 400 + assert data["summary"]["protected_bytes"] == 200 + assert data["summary"]["vulnerable_bytes"] == 200 + + assert len(data["duplicates"]) == 1 + assert data["duplicates"][0]["copies"] == 2 + assert data["duplicates"][0]["saved"] == 100 + + assert len(data["extensions"]) >= 1 + assert any(e["ext"] == "txt" for e in data["extensions"]) + + assert len(data["redundancy"]) >= 1 + + +# ── Treemap / Directories ── + + +def test_get_treemap(client, db_session): + """Tests the treemap endpoint returns hierarchical directory data.""" + f1 = models.FilesystemState(file_path="/data/sub/file1.txt", size=100, mtime=1000) + f2 = models.FilesystemState(file_path="/data/sub/file2.txt", size=200, mtime=1000) + db_session.add_all([f1, f2]) + db_session.commit() + + response = client.get("/inventory/directories") + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + # Should contain data directory + assert len(data) >= 1 + + +# ── Detect Media ── + + +def test_detect_media_finds_new_insertion(client, db_session, mocker): + """Tests detect_media finds newly inserted unregistered media.""" + media = models.StorageMedia( + media_type="lto_tape", + identifier="EXISTING_TAPE", + capacity=1000, + status="active", + extra_config=json.dumps({"device_path": "/dev/nst0"}), + ) + db_session.add(media) + db_session.commit() + + mock_provider = mocker.MagicMock() + mock_provider.provider_id = "lto_tape" + mock_provider.check_online.return_value = True + mock_provider.get_live_info.return_value = {"identity": "NEW_TAPE_01"} + + mocker.patch.object( + archiver_manager, + "_get_storage_provider", + return_value=mock_provider, + ) + + response = client.get("/inventory/detect") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + assert data[0]["identifier"] == "NEW_TAPE_01" + assert data[0]["device_path"] == "/dev/nst0"