address these:
| # | Issue | Location | |---|-------|----------| | 1 | **Archiver backs up deleted files** — `get_unbacked_files` never checks `is_deleted`, so files the scanner marked missing still get archived (wastes media, fails at tar assembly when source is gone) | `archiver.py:157-163` | | 2 | **Hashing phase marks wrong files as deleted** — after one sub-batch finishes, the code iterates over ALL fetched records (`path_to_record`) not just the completed sub-batch. Files in pending sub-batches get falsely `is_deleted=True` if not on disk | `scanner.py:854-861` | | 3 | **Multiple threads mutate ORM objects concurrently** — `ThreadPoolExecutor` workers share and write to session-bound objects (`sha256_hash`, `is_deleted`) from different threads. SQLAlchemy sessions are not thread-safe | `scanner.py:814-861` |
This commit is contained in:
@@ -0,0 +1,30 @@
|
||||
"""add_is_cancelled_to_jobs
|
||||
|
||||
Revision ID: 7f8e9d10c2a3
|
||||
Revises: e851b23b0f5d
|
||||
Create Date: 2026-04-30 15:00:00.000000
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "7f8e9d10c2a3"
|
||||
down_revision: Union[str, Sequence[str], None] = "e851b23b0f5d"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"jobs",
|
||||
sa.Column("is_cancelled", sa.Boolean(), nullable=False, server_default="0"),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column("jobs", "is_cancelled")
|
||||
@@ -172,6 +172,18 @@ def get_ignored_status(
|
||||
return False
|
||||
|
||||
|
||||
def _validate_path_within_roots(path: str, roots: List[str]) -> bool:
|
||||
"""Validates that a path does not contain traversal sequences and is within configured roots."""
|
||||
if ".." in path:
|
||||
return False
|
||||
abs_path = os.path.abspath(path)
|
||||
for root in roots:
|
||||
abs_root = os.path.abspath(root)
|
||||
if abs_path == abs_root or abs_path.startswith(abs_root + os.sep):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# --- Endpoints ---
|
||||
|
||||
|
||||
@@ -628,15 +640,25 @@ def browse_system_path(
|
||||
)
|
||||
return BrowseResponseSchema(files=results, last_scan_time=last_scan_time)
|
||||
|
||||
if not _validate_path_within_roots(path, roots):
|
||||
raise HTTPException(
|
||||
status_code=403, detail="Path is outside configured source roots"
|
||||
)
|
||||
|
||||
target_prefix = path if path.endswith("/") else path + "/"
|
||||
|
||||
# Escape LIKE wildcards in the prefix
|
||||
escaped_prefix = (
|
||||
target_prefix.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
|
||||
)
|
||||
|
||||
files_sql = text("""
|
||||
SELECT file_path, size, mtime, sha256_hash, is_ignored
|
||||
FROM filesystem_state
|
||||
WHERE file_path LIKE :prefix
|
||||
WHERE file_path LIKE :prefix ESCAPE '\\'
|
||||
AND file_path != :prefix
|
||||
""")
|
||||
rows = db_session.execute(files_sql, {"prefix": f"{target_prefix}%"}).fetchall()
|
||||
rows = db_session.execute(files_sql, {"prefix": f"{escaped_prefix}%"}).fetchall()
|
||||
|
||||
if not rows and os.path.isdir(path):
|
||||
try:
|
||||
@@ -873,6 +895,8 @@ def test_notification_dispatch(request_data: TestNotificationRequest):
|
||||
@router.get("/ls")
|
||||
def list_host_directories(path: str = "/"):
|
||||
"""Lists subdirectories on the host system for UI path selection."""
|
||||
if ".." in path:
|
||||
raise HTTPException(status_code=403, detail="Path traversal not allowed")
|
||||
if not os.path.exists(path) or not os.path.isdir(path):
|
||||
return []
|
||||
|
||||
@@ -1128,6 +1152,11 @@ def get_system_tree(path: Optional[str] = None, db_session: Session = Depends(ge
|
||||
TreeNodeSchema(name=root, path=root, has_children=True) for root in roots
|
||||
]
|
||||
|
||||
if not _validate_path_within_roots(path, roots):
|
||||
raise HTTPException(
|
||||
status_code=403, detail="Path is outside configured source roots"
|
||||
)
|
||||
|
||||
results = []
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
@@ -1168,6 +1197,7 @@ def list_discrepancies(db_session: Session = Depends(get_db)):
|
||||
models.FilesystemState.sha256_hash.is_(None),
|
||||
models.FilesystemState.is_ignored.is_(False),
|
||||
models.FilesystemState.is_deleted.is_(False),
|
||||
models.FilesystemState.missing_acknowledged_at.is_(None),
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
@@ -123,6 +123,7 @@ class Job(Base):
|
||||
progress: Mapped[float] = mapped_column(Float, default=0.0)
|
||||
current_task: Mapped[Optional[str]] = mapped_column(String, nullable=True)
|
||||
error_message: Mapped[Optional[str]] = mapped_column(String, nullable=True)
|
||||
is_cancelled: Mapped[bool] = mapped_column(Boolean, default=False)
|
||||
started_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
|
||||
completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
|
||||
@@ -156,6 +156,7 @@ class ArchiverService:
|
||||
)
|
||||
.filter(
|
||||
not_(models.FilesystemState.is_ignored),
|
||||
models.FilesystemState.is_deleted.is_(False),
|
||||
(coverage_subquery.c.covered_bytes.is_(None))
|
||||
| (coverage_subquery.c.covered_bytes < models.FilesystemState.size),
|
||||
)
|
||||
|
||||
@@ -112,37 +112,8 @@ def _init_fast_features() -> Tuple[Optional[str], Optional[str]]:
|
||||
_FAST_FIND_BINARY, _FAST_HASH_BINARY = _init_fast_features()
|
||||
|
||||
|
||||
def _make_hash_callback(scanner, path_to_record, job_id, total_pending):
|
||||
"""Create a thread-safe callback for streaming hash results.
|
||||
|
||||
Updates metrics, assigns hashes to records, and reports job progress
|
||||
as each file completes.
|
||||
"""
|
||||
|
||||
def on_result(file_path, hex_digest):
|
||||
target_record = path_to_record.get(file_path)
|
||||
if target_record and hex_digest:
|
||||
target_record.sha256_hash = hex_digest
|
||||
with scanner._metrics_lock:
|
||||
scanner.bytes_hashed += target_record.size or 0
|
||||
scanner.files_hashed += 1
|
||||
# Report progress incrementally as files complete
|
||||
if scanner.files_hashed % 5 == 0:
|
||||
progress = min(
|
||||
99.9,
|
||||
(scanner.files_hashed / max(total_pending, 1)) * 100,
|
||||
)
|
||||
JobManager.update_job(
|
||||
job_id,
|
||||
progress,
|
||||
f"Hashed {scanner.files_hashed} files ({scanner._format_throughput()})...",
|
||||
)
|
||||
|
||||
return on_result
|
||||
|
||||
|
||||
def _hash_file_batch_fast(
|
||||
file_paths: List[str], binary: str, on_result=None
|
||||
file_paths: List[str], binary: str
|
||||
) -> Dict[str, Optional[str]]:
|
||||
"""Hash a batch of files using a native SHA-256 binary.
|
||||
|
||||
@@ -151,8 +122,6 @@ def _hash_file_batch_fast(
|
||||
Args:
|
||||
file_paths: Paths to hash.
|
||||
binary: Path to sha256sum or shasum.
|
||||
on_result: Optional callback(file_path, hex_digest) called for each
|
||||
file as it completes.
|
||||
|
||||
Returns a mapping of file_path -> hex_digest (or None on failure).
|
||||
"""
|
||||
@@ -172,11 +141,12 @@ def _hash_file_batch_fast(
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
|
||||
# Stream output line-by-line for incremental progress
|
||||
assert proc.stdout is not None
|
||||
if proc.stdout is None:
|
||||
return results
|
||||
for line in iter(proc.stdout.readline, b""):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
@@ -196,8 +166,6 @@ def _hash_file_batch_fast(
|
||||
clean_path = raw_path.replace("\\\\", "\\")
|
||||
|
||||
results[clean_path] = file_hash
|
||||
if on_result is not None:
|
||||
on_result(clean_path, file_hash)
|
||||
|
||||
proc.stdout.close()
|
||||
proc.wait()
|
||||
@@ -246,9 +214,13 @@ def _discover_files_fast(
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
assert proc.stdout is not None
|
||||
if proc.stdout is None:
|
||||
logger.error(
|
||||
f"Fast file discovery failed: could not open stdout for {root_base}"
|
||||
)
|
||||
return 0, 0
|
||||
except Exception as e:
|
||||
logger.error(f"Fast file discovery failed for {root_base}: {e}")
|
||||
return 0, 0
|
||||
@@ -403,6 +375,7 @@ class JobManager:
|
||||
job_record = db_session.get(models.Job, job_id)
|
||||
if job_record and job_record.status in ["PENDING", "RUNNING"]:
|
||||
job_record.status = "FAILED"
|
||||
job_record.is_cancelled = True
|
||||
job_record.error_message = "Cancelled by user"
|
||||
job_record.completed_at = datetime.now(timezone.utc)
|
||||
db_session.commit()
|
||||
@@ -416,11 +389,7 @@ class JobManager:
|
||||
with SessionLocal() as db_session:
|
||||
try:
|
||||
job_record = db_session.get(models.Job, job_id)
|
||||
return bool(
|
||||
job_record
|
||||
and job_record.status == "FAILED"
|
||||
and job_record.error_message == "Cancelled by user"
|
||||
)
|
||||
return bool(job_record and job_record.is_cancelled)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@@ -644,22 +613,18 @@ class ScannerService:
|
||||
db_session.commit()
|
||||
|
||||
# Detect files present in DB but not found during this scan
|
||||
stale_records = (
|
||||
stale_query = (
|
||||
db_session.query(models.FilesystemState)
|
||||
.filter(
|
||||
models.FilesystemState.last_seen_timestamp < current_timestamp,
|
||||
models.FilesystemState.is_deleted.is_(False),
|
||||
models.FilesystemState.is_ignored.is_(False),
|
||||
)
|
||||
.all()
|
||||
.yield_per(1000)
|
||||
)
|
||||
if (
|
||||
stale_records and not JobManager.is_cancelled(job_id)
|
||||
if job_id
|
||||
else True
|
||||
):
|
||||
if not JobManager.is_cancelled(job_id) if job_id else True:
|
||||
missing_count = 0
|
||||
for record in stale_records:
|
||||
for record in stale_query:
|
||||
if not os.path.exists(record.file_path):
|
||||
record.is_deleted = True
|
||||
missing_count += 1
|
||||
@@ -827,12 +792,6 @@ class ScannerService:
|
||||
_hash_file_batch_fast,
|
||||
batch,
|
||||
_FAST_HASH_BINARY,
|
||||
_make_hash_callback(
|
||||
self,
|
||||
path_to_record,
|
||||
hashing_job.id,
|
||||
total_pending,
|
||||
),
|
||||
): batch
|
||||
for batch in sub_batches
|
||||
}
|
||||
@@ -849,13 +808,34 @@ class ScannerService:
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Detect files in this batch that didn't get a hash
|
||||
# (likely missing from disk) and mark them as deleted
|
||||
for file_path, target_record in path_to_record.items():
|
||||
if (
|
||||
file_path not in batch_results
|
||||
and not os.path.exists(file_path)
|
||||
):
|
||||
# Apply hashes and detect missing files ONLY for this batch
|
||||
for file_path in batch:
|
||||
target_record = path_to_record.get(file_path)
|
||||
if not target_record:
|
||||
continue
|
||||
if file_path in batch_results:
|
||||
target_record.sha256_hash = batch_results[
|
||||
file_path
|
||||
]
|
||||
with self._metrics_lock:
|
||||
self.bytes_hashed += target_record.size or 0
|
||||
self.files_hashed += 1
|
||||
# Report progress incrementally as files complete
|
||||
if self.files_hashed % 5 == 0:
|
||||
progress = min(
|
||||
99.9,
|
||||
(
|
||||
self.files_hashed
|
||||
/ max(total_pending, 1)
|
||||
)
|
||||
* 100,
|
||||
)
|
||||
JobManager.update_job(
|
||||
hashing_job.id,
|
||||
progress,
|
||||
f"Hashed {self.files_hashed} files ({self._format_throughput()})...",
|
||||
)
|
||||
elif not os.path.exists(file_path):
|
||||
target_record.is_deleted = True
|
||||
with self._metrics_lock:
|
||||
self.files_missing += 1
|
||||
|
||||
Reference in New Issue
Block a user