Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c76ccd0dfa | |||
| 06eb00ab3e |
@@ -82,9 +82,11 @@ def db_session():
|
||||
conn.execute(text("PRAGMA foreign_keys = OFF"))
|
||||
# Fetch all tables from the metadata
|
||||
for table_name in reversed(Base.metadata.tables.keys()):
|
||||
# Avoid truncating internal alembic or FTS tables
|
||||
if "alembic" not in table_name and "fts" not in table_name:
|
||||
# Avoid truncating internal alembic tables
|
||||
if "alembic" not in table_name:
|
||||
conn.execute(text(f"DELETE FROM {table_name}"))
|
||||
# FTS5 virtual table is not in Base.metadata; clear it explicitly
|
||||
conn.execute(text("DELETE FROM filesystem_fts"))
|
||||
conn.execute(text("PRAGMA foreign_keys = ON"))
|
||||
|
||||
|
||||
|
||||
@@ -147,14 +147,21 @@ def test_search_index(client, db_session):
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Trigger FTS manually since we are using raw SQL triggers which might not have fired
|
||||
# if we didn't insert via SQL or if there are issues in :memory:
|
||||
# but conftest uses a real temp file.
|
||||
# Manually insert into FTS5 since triggers may not fire on ORM inserts in tests
|
||||
from sqlalchemy import text
|
||||
|
||||
db_session.execute(
|
||||
text("INSERT INTO filesystem_fts(rowid, file_path) VALUES (:rowid, :path)"),
|
||||
{"rowid": file1.id, "path": file1.file_path},
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
response = client.get("/archive/search?q=important")
|
||||
assert response.status_code == 200
|
||||
# If FTS5 is working, it should return results.
|
||||
data = response.json()
|
||||
assert len(data) == 1
|
||||
assert data[0]["path"] == "data/important.doc"
|
||||
assert data[0]["name"] == "important.doc"
|
||||
|
||||
|
||||
def test_get_metadata(client, db_session):
|
||||
|
||||
@@ -52,13 +52,6 @@ def test_update_settings(client):
|
||||
assert response.json()["schedule_scan"] == "0 2 * * *"
|
||||
|
||||
|
||||
def test_list_jobs_empty(client):
|
||||
"""Tests listing jobs when none exist."""
|
||||
response = client.get("/system/jobs")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == []
|
||||
|
||||
|
||||
def test_trigger_scan(client):
|
||||
"""Tests triggering a system scan."""
|
||||
response = client.post("/system/scan")
|
||||
@@ -77,10 +70,17 @@ def test_get_scan_status(client):
|
||||
|
||||
|
||||
def test_ls_root(client):
|
||||
"""Tests listing the root directory."""
|
||||
"""Tests listing the root directory returns actual subdirectories."""
|
||||
response = client.get("/system/ls?path=/")
|
||||
assert response.status_code == 200
|
||||
assert isinstance(response.json(), list)
|
||||
data = response.json()
|
||||
assert isinstance(data, list)
|
||||
assert len(data) > 0
|
||||
for entry in data:
|
||||
assert "name" in entry
|
||||
assert "path" in entry
|
||||
assert entry["name"] != ""
|
||||
assert entry["path"] != ""
|
||||
|
||||
|
||||
def test_ignore_hardware(client):
|
||||
@@ -104,127 +104,6 @@ def test_scan_status_includes_files_missing(client):
|
||||
assert data["files_missing"] == 0
|
||||
|
||||
|
||||
def test_list_discrepancies_empty(client):
|
||||
"""Tests listing discrepancies when none exist."""
|
||||
response = client.get("/system/discrepancies")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == []
|
||||
|
||||
|
||||
def test_list_discrepancies_deleted_file(client, db_session):
|
||||
"""Tests listing a confirmed-deleted file in discrepancies."""
|
||||
file_record = models.FilesystemState(
|
||||
file_path="/data/old.txt",
|
||||
size=100,
|
||||
mtime=1000,
|
||||
is_deleted=True,
|
||||
is_ignored=False,
|
||||
sha256_hash=None,
|
||||
)
|
||||
db_session.add(file_record)
|
||||
db_session.commit()
|
||||
|
||||
response = client.get("/system/discrepancies")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert len(data) == 1
|
||||
assert data[0]["path"] == "/data/old.txt"
|
||||
assert data[0]["is_deleted"] is True
|
||||
|
||||
|
||||
def test_confirm_file_deleted(client, db_session):
|
||||
"""Tests confirming a file as deleted."""
|
||||
file_record = models.FilesystemState(
|
||||
file_path="/data/verify.txt",
|
||||
size=50,
|
||||
mtime=2000,
|
||||
is_deleted=False,
|
||||
)
|
||||
db_session.add(file_record)
|
||||
db_session.commit()
|
||||
|
||||
response = client.post(f"/system/discrepancies/{file_record.id}/confirm")
|
||||
assert response.status_code == 200
|
||||
assert "marked as deleted" in response.json()["message"]
|
||||
|
||||
db_session.expire_all()
|
||||
db_session.refresh(file_record)
|
||||
assert file_record.is_deleted is True
|
||||
|
||||
|
||||
def test_confirm_file_deleted_not_found(client):
|
||||
"""Tests confirming a non-existent file returns 404."""
|
||||
response = client.post("/system/discrepancies/9999/confirm")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_dismiss_discrepancy(client, db_session):
|
||||
"""Tests dismissing a deleted file."""
|
||||
file_record = models.FilesystemState(
|
||||
file_path="/data/dismiss.txt",
|
||||
size=50,
|
||||
mtime=2000,
|
||||
is_deleted=True,
|
||||
)
|
||||
db_session.add(file_record)
|
||||
db_session.commit()
|
||||
|
||||
response = client.post(f"/system/discrepancies/{file_record.id}/dismiss")
|
||||
assert response.status_code == 200
|
||||
assert "dismissed" in response.json()["message"]
|
||||
|
||||
db_session.expire_all()
|
||||
db_session.refresh(file_record)
|
||||
assert file_record.missing_acknowledged_at is not None
|
||||
|
||||
|
||||
def test_delete_file_record(client, db_session):
|
||||
"""Tests hard-deleting a file record and its versions."""
|
||||
media = models.StorageMedia(
|
||||
media_type="hdd", identifier="M1", capacity=1000, status="active"
|
||||
)
|
||||
db_session.add(media)
|
||||
db_session.flush()
|
||||
|
||||
file_record = models.FilesystemState(
|
||||
file_path="/data/hard_delete.txt",
|
||||
size=100,
|
||||
mtime=1000,
|
||||
is_deleted=True,
|
||||
)
|
||||
db_session.add(file_record)
|
||||
db_session.flush()
|
||||
|
||||
db_session.add(
|
||||
models.FileVersion(
|
||||
filesystem_state_id=file_record.id,
|
||||
media_id=media.id,
|
||||
file_number="1",
|
||||
offset_start=0,
|
||||
offset_end=100,
|
||||
)
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
file_id = file_record.id
|
||||
|
||||
response = client.delete(f"/system/discrepancies/{file_id}")
|
||||
assert response.status_code == 200
|
||||
|
||||
db_session.expire_all()
|
||||
|
||||
# Verify file and version are gone
|
||||
assert (
|
||||
db_session.query(models.FilesystemState).filter_by(id=file_id).first() is None
|
||||
)
|
||||
assert (
|
||||
db_session.query(models.FileVersion)
|
||||
.filter_by(filesystem_state_id=file_id)
|
||||
.first()
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_dashboard_stats_excludes_failed_media(client, db_session):
|
||||
"""Tests that dashboard stats do not count versions on failed or retired media."""
|
||||
active_media = models.StorageMedia(
|
||||
@@ -593,10 +472,13 @@ def test_ignore_hardware_duplicate(client):
|
||||
|
||||
|
||||
def test_database_export(client):
|
||||
"""Tests database export endpoint returns a file response."""
|
||||
"""Tests database export endpoint returns a SQLite file download."""
|
||||
response = client.get("/system/database/export")
|
||||
# May return 200 with file or 404 if db path not found
|
||||
assert response.status_code in (200, 404)
|
||||
assert response.status_code == 200
|
||||
assert "tapehoard_index_" in response.headers["content-disposition"]
|
||||
assert ".db" in response.headers["content-disposition"]
|
||||
# Should contain SQLite magic bytes
|
||||
assert response.content[:16] == b"SQLite format 3\x00"
|
||||
|
||||
|
||||
# ── Tracking Batch ──
|
||||
@@ -676,5 +558,5 @@ def test_test_notification_invalid_url(client):
|
||||
response = client.post(
|
||||
"/system/notifications/test", json={"url": "not-a-valid-url"}
|
||||
)
|
||||
# Notification manager may succeed or fail depending on apprise parsing
|
||||
assert response.status_code in (200, 500)
|
||||
assert response.status_code == 500
|
||||
assert "Failed to dispatch test alert" in response.json()["detail"]
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import hashlib
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
from app.services.scanner import (
|
||||
ScannerService,
|
||||
JobManager,
|
||||
@@ -146,8 +148,7 @@ def test_scan_sources_mocked(db_session, mocker):
|
||||
def test_hash_file_batch_fast(tmp_path):
|
||||
"""Tests native sha256sum/shasum batch hashing if available."""
|
||||
if _FAST_HASH_BINARY is None:
|
||||
# Skip if no native hash binary is available
|
||||
return
|
||||
pytest.skip("No native hash binary available")
|
||||
|
||||
# Create test files
|
||||
files = {}
|
||||
@@ -168,7 +169,7 @@ def test_hash_file_batch_fast(tmp_path):
|
||||
def test_hash_file_batch_fast_empty():
|
||||
"""Tests that empty batch returns empty results."""
|
||||
if _FAST_HASH_BINARY is None:
|
||||
return
|
||||
pytest.skip("No native hash binary available")
|
||||
|
||||
results = _hash_file_batch_fast([], _FAST_HASH_BINARY)
|
||||
assert results == {}
|
||||
@@ -177,7 +178,7 @@ def test_hash_file_batch_fast_empty():
|
||||
def test_hash_file_batch_fast_nonexistent():
|
||||
"""Tests that non-existent files are gracefully handled."""
|
||||
if _FAST_HASH_BINARY is None:
|
||||
return
|
||||
pytest.skip("No native hash binary available")
|
||||
|
||||
results = _hash_file_batch_fast(["/nonexistent/path"], _FAST_HASH_BINARY)
|
||||
# Non-existent files may or may not appear in results depending on binary behavior
|
||||
@@ -222,8 +223,6 @@ def test_missing_file_marked_deleted_at_end_of_scan(db_session, mocker):
|
||||
def test_existing_file_not_marked_deleted(db_session, mocker):
|
||||
"""Tests that files found during scan retain is_deleted=False."""
|
||||
scanner = ScannerService()
|
||||
print(f"DEBUG test_existing: scanner.is_running = {scanner.is_running}")
|
||||
print(f"DEBUG test_existing: scanner.is_hashing = {scanner.is_hashing}")
|
||||
|
||||
mocker.patch("app.services.scanner._FAST_FIND_BINARY", None)
|
||||
mocker.patch("app.api.common.get_source_roots", return_value=["/mock_source"])
|
||||
|
||||
Reference in New Issue
Block a user