more robust scanning jobs
This commit is contained in:
+119
-132
@@ -9,13 +9,14 @@ from typing import Any, Dict, List, Optional
|
||||
import psutil
|
||||
from loguru import logger
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.orm.exc import StaleDataError
|
||||
|
||||
from app.db import models
|
||||
from app.db.database import SessionLocal
|
||||
|
||||
|
||||
class JobManager:
|
||||
"""Manages operational job states and persistence."""
|
||||
"""Manages operational job states and persistence with high resilience for background threads."""
|
||||
|
||||
@staticmethod
|
||||
def create_job(db_session: Session, job_type: str) -> models.Job:
|
||||
@@ -29,8 +30,6 @@ class JobManager:
|
||||
@staticmethod
|
||||
def start_job(job_id: int):
|
||||
"""Marks a job as running and sets the start timestamp."""
|
||||
from sqlalchemy.orm.exc import StaleDataError
|
||||
|
||||
with SessionLocal() as db_session:
|
||||
try:
|
||||
job_record = db_session.get(models.Job, job_id)
|
||||
@@ -38,17 +37,13 @@ class JobManager:
|
||||
job_record.status = "RUNNING"
|
||||
job_record.started_at = datetime.now(timezone.utc)
|
||||
db_session.commit()
|
||||
except StaleDataError:
|
||||
except (StaleDataError, Exception) as e:
|
||||
db_session.rollback()
|
||||
logger.debug(
|
||||
f"Job {job_id} already modified or deleted (StaleDataError)."
|
||||
)
|
||||
logger.debug(f"JobManager.start_job failed for {job_id}: {e}")
|
||||
|
||||
@staticmethod
|
||||
def update_job(job_id: int, progress: float, current_task: str):
|
||||
"""Updates the progress and current task description for a job."""
|
||||
from sqlalchemy.orm.exc import StaleDataError
|
||||
|
||||
with SessionLocal() as db_session:
|
||||
try:
|
||||
job_record = db_session.get(models.Job, job_id)
|
||||
@@ -56,14 +51,13 @@ class JobManager:
|
||||
job_record.progress = progress
|
||||
job_record.current_task = current_task
|
||||
db_session.commit()
|
||||
except StaleDataError:
|
||||
except (StaleDataError, Exception) as e:
|
||||
db_session.rollback()
|
||||
logger.debug(f"JobManager.update_job failed for {job_id}: {e}")
|
||||
|
||||
@staticmethod
|
||||
def complete_job(job_id: int):
|
||||
"""Marks a job as successfully completed."""
|
||||
from sqlalchemy.orm.exc import StaleDataError
|
||||
|
||||
with SessionLocal() as db_session:
|
||||
try:
|
||||
job_record = db_session.get(models.Job, job_id)
|
||||
@@ -72,14 +66,13 @@ class JobManager:
|
||||
job_record.progress = 100.0
|
||||
job_record.completed_at = datetime.now(timezone.utc)
|
||||
db_session.commit()
|
||||
except StaleDataError:
|
||||
except (StaleDataError, Exception) as e:
|
||||
db_session.rollback()
|
||||
logger.debug(f"JobManager.complete_job failed for {job_id}: {e}")
|
||||
|
||||
@staticmethod
|
||||
def fail_job(job_id: int, error_message: str):
|
||||
"""Marks a job as failed and records the error message."""
|
||||
from sqlalchemy.orm.exc import StaleDataError
|
||||
|
||||
with SessionLocal() as db_session:
|
||||
try:
|
||||
job_record = db_session.get(models.Job, job_id)
|
||||
@@ -88,14 +81,13 @@ class JobManager:
|
||||
job_record.error_message = error_message
|
||||
job_record.completed_at = datetime.now(timezone.utc)
|
||||
db_session.commit()
|
||||
except StaleDataError:
|
||||
except (StaleDataError, Exception) as e:
|
||||
db_session.rollback()
|
||||
logger.debug(f"JobManager.fail_job failed for {job_id}: {e}")
|
||||
|
||||
@staticmethod
|
||||
def cancel_job(job_id: int):
|
||||
"""Submits a cancellation request for a pending or running job."""
|
||||
from sqlalchemy.orm.exc import StaleDataError
|
||||
|
||||
with SessionLocal() as db_session:
|
||||
try:
|
||||
job_record = db_session.get(models.Job, job_id)
|
||||
@@ -104,19 +96,23 @@ class JobManager:
|
||||
job_record.error_message = "Cancelled by user"
|
||||
job_record.completed_at = datetime.now(timezone.utc)
|
||||
db_session.commit()
|
||||
except StaleDataError:
|
||||
except (StaleDataError, Exception) as e:
|
||||
db_session.rollback()
|
||||
logger.debug(f"JobManager.cancel_job failed for {job_id}: {e}")
|
||||
|
||||
@staticmethod
|
||||
def is_cancelled(job_id: int) -> bool:
|
||||
"""Checks if a job has been cancelled by the user."""
|
||||
with SessionLocal() as db_session:
|
||||
job_record = db_session.get(models.Job, job_id)
|
||||
return bool(
|
||||
job_record
|
||||
and job_record.status == "FAILED"
|
||||
and job_record.error_message == "Cancelled by user"
|
||||
)
|
||||
try:
|
||||
job_record = db_session.get(models.Job, job_id)
|
||||
return bool(
|
||||
job_record
|
||||
and job_record.status == "FAILED"
|
||||
and job_record.error_message == "Cancelled by user"
|
||||
)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class ScannerService:
|
||||
@@ -155,76 +151,48 @@ class ScannerService:
|
||||
with self._metrics_lock:
|
||||
self.is_throttled = iowait_value > 5.0
|
||||
self._current_iowait = iowait_value
|
||||
except Exception as monitor_error:
|
||||
logger.debug(f"I/O Monitor pulse failed: {monitor_error}")
|
||||
time.sleep(1)
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(2)
|
||||
|
||||
def _set_process_priority(self, level: str = "normal"):
|
||||
"""Adjusts CPU and I/O priority for the current process."""
|
||||
def _set_process_priority(self, level: str):
|
||||
"""Adjusts the CPU and I/O priority of the current process."""
|
||||
try:
|
||||
p = psutil.Process(os.getpid())
|
||||
if level == "background":
|
||||
os.nice(19)
|
||||
if hasattr(psutil.Process(), "ionice") and hasattr(
|
||||
psutil, "IOPRIO_CLASS_IDLE"
|
||||
):
|
||||
process_handle = psutil.Process()
|
||||
process_handle.ionice(psutil.IOPRIO_CLASS_IDLE)
|
||||
if hasattr(p, "ionice"):
|
||||
p.ionice(
|
||||
psutil.IOPRIO_CLASS_IDLE
|
||||
) # ty:ignore[unresolved-attribute]
|
||||
p.nice(19)
|
||||
else:
|
||||
os.nice(0)
|
||||
if hasattr(psutil.Process(), "ionice") and hasattr(
|
||||
psutil, "IOPRIO_CLASS_BE"
|
||||
):
|
||||
process_handle = psutil.Process()
|
||||
process_handle.ionice(psutil.IOPRIO_CLASS_BE, value=4)
|
||||
except Exception as priority_error:
|
||||
logger.debug(f"Priority adjustment restricted: {priority_error}")
|
||||
|
||||
def compute_sha256(self, file_path: str, job_id: Optional[int] = None) -> str:
|
||||
"""Computes the SHA-256 hash of a file with high-velocity block processing."""
|
||||
hash_engine = hashlib.sha256()
|
||||
|
||||
# Increase block size to 8MB for high-speed NVMe saturation
|
||||
# and use local counter to minimize lock contention
|
||||
BLOCK_SIZE = 8 * 1024 * 1024
|
||||
local_processed_bytes = 0
|
||||
SYNC_THRESHOLD = 128 * 1024 * 1024 # Sync metrics every 128MB
|
||||
if hasattr(p, "ionice"):
|
||||
p.ionice(psutil.IOPRIO_CLASS_BE) # ty:ignore[unresolved-attribute]
|
||||
p.nice(0)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def compute_sha256(
|
||||
self, file_path: str, job_id: Optional[int] = None
|
||||
) -> Optional[str]:
|
||||
"""Calculates the SHA-256 hash of a file with dynamic throttling."""
|
||||
try:
|
||||
with open(file_path, "rb") as file_handle:
|
||||
while True:
|
||||
sha256_hash = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
for byte_block in iter(lambda: f.read(1024 * 1024), b""):
|
||||
if job_id is not None and JobManager.is_cancelled(job_id):
|
||||
return ""
|
||||
return None
|
||||
|
||||
# Dynamic throttling - only check periodically to save cycles
|
||||
# Dynamic Throttling: If system I/O is high, sleep between blocks
|
||||
if self.is_throttled:
|
||||
throttle_delay = 0.05 if self._current_iowait < 15.0 else 0.2
|
||||
time.sleep(throttle_delay)
|
||||
time.sleep(0.1)
|
||||
|
||||
byte_block = file_handle.read(BLOCK_SIZE)
|
||||
if not byte_block:
|
||||
break
|
||||
|
||||
hash_engine.update(byte_block)
|
||||
local_processed_bytes += len(byte_block)
|
||||
|
||||
# Batch sync metrics to global counter
|
||||
if local_processed_bytes >= SYNC_THRESHOLD:
|
||||
with self._metrics_lock:
|
||||
self.bytes_hashed += local_processed_bytes
|
||||
local_processed_bytes = 0
|
||||
|
||||
# Final remaining sync
|
||||
if local_processed_bytes > 0:
|
||||
sha256_hash.update(byte_block)
|
||||
with self._metrics_lock:
|
||||
self.bytes_hashed += local_processed_bytes
|
||||
|
||||
return hash_engine.hexdigest()
|
||||
except OSError as io_error:
|
||||
logger.error(f"IO Error during hashing {file_path}: {io_error}")
|
||||
return ""
|
||||
except Exception as generic_error:
|
||||
logger.error(f"Unexpected error hashing {file_path}: {generic_error}")
|
||||
return ""
|
||||
self.bytes_hashed += len(byte_block)
|
||||
return sha256_hash.hexdigest()
|
||||
except (OSError, PermissionError):
|
||||
return None
|
||||
|
||||
def _format_throughput(self) -> str:
|
||||
"""Calculates and formats current hashing speed."""
|
||||
@@ -239,21 +207,21 @@ class ScannerService:
|
||||
return f"{bytes_per_second:.1f} TB/s"
|
||||
|
||||
def scan_sources(self, db_session: Session, job_id: Optional[int] = None):
|
||||
"""Executes Phase 1: Fast Metadata Discovery."""
|
||||
"""Executes Phase 1: Metadata discovery and index synchronization."""
|
||||
if self.is_running:
|
||||
logger.warning("Discovery scan already active.")
|
||||
return
|
||||
|
||||
self.is_running = True
|
||||
self.files_processed = 0
|
||||
self.files_new = 0
|
||||
self.files_modified = 0
|
||||
self.total_files_found = 0
|
||||
self.current_path = ""
|
||||
self._set_process_priority("normal")
|
||||
|
||||
if job_id is not None:
|
||||
JobManager.start_job(job_id)
|
||||
JobManager.update_job(job_id, 0.0, "Starting system scan...")
|
||||
|
||||
self._set_process_priority("normal")
|
||||
with self._metrics_lock:
|
||||
self.files_processed = 0
|
||||
self.files_new = 0
|
||||
self.files_modified = 0
|
||||
self.total_files_found = 0
|
||||
|
||||
try:
|
||||
from app.api.system import get_exclusion_spec, get_source_roots
|
||||
@@ -288,7 +256,9 @@ class ScannerService:
|
||||
pending_metadata: List[Dict[str, Any]] = []
|
||||
|
||||
# Initialize Phase 2 in background
|
||||
threading.Thread(target=self.run_hashing).start()
|
||||
hashing_thread = threading.Thread(target=self.run_hashing)
|
||||
hashing_thread.daemon = True
|
||||
hashing_thread.start()
|
||||
|
||||
for root_base in source_roots:
|
||||
if job_id is not None and JobManager.is_cancelled(job_id):
|
||||
@@ -402,6 +372,13 @@ class ScannerService:
|
||||
with self._metrics_lock:
|
||||
self.files_processed += 1
|
||||
|
||||
def stop(self):
|
||||
"""Signals background threads to stop and clears operational flags."""
|
||||
with self._metrics_lock:
|
||||
self.is_running = False
|
||||
self.is_hashing = False
|
||||
logger.info("Scanner service shutdown signaled.")
|
||||
|
||||
def run_hashing(self):
|
||||
"""Executes Phase 2: Background Content Hashing."""
|
||||
if self.is_hashing:
|
||||
@@ -411,26 +388,26 @@ class ScannerService:
|
||||
|
||||
self._set_process_priority("background")
|
||||
|
||||
with SessionLocal() as db_session:
|
||||
hashing_job = JobManager.create_job(db_session, "HASH")
|
||||
JobManager.start_job(hashing_job.id)
|
||||
try:
|
||||
with SessionLocal() as db_session:
|
||||
hashing_job = JobManager.create_job(db_session, "HASH")
|
||||
JobManager.start_job(hashing_job.id)
|
||||
|
||||
self.start_time = time.time()
|
||||
self.bytes_hashed = 0
|
||||
self.files_hashed = 0
|
||||
self.start_time = time.time()
|
||||
self.bytes_hashed = 0
|
||||
self.files_hashed = 0
|
||||
|
||||
# Count total work pending for progress reporting
|
||||
total_pending = (
|
||||
db_session.query(models.FilesystemState)
|
||||
.filter(
|
||||
models.FilesystemState.sha256_hash.is_(None),
|
||||
models.FilesystemState.is_ignored.is_(False),
|
||||
# Count total work pending for progress reporting
|
||||
total_pending = (
|
||||
db_session.query(models.FilesystemState)
|
||||
.filter(
|
||||
models.FilesystemState.sha256_hash.is_(None),
|
||||
models.FilesystemState.is_ignored.is_(False),
|
||||
)
|
||||
.count()
|
||||
)
|
||||
.count()
|
||||
)
|
||||
|
||||
try:
|
||||
while True:
|
||||
while self.is_hashing:
|
||||
# Find unindexed work
|
||||
hashing_targets = (
|
||||
db_session.query(models.FilesystemState)
|
||||
@@ -443,19 +420,11 @@ class ScannerService:
|
||||
)
|
||||
|
||||
if not hashing_targets:
|
||||
# If we are in 'Phase 1' (discovery), wait for more files to appear
|
||||
if self.is_running:
|
||||
time.sleep(2)
|
||||
# Recount if more work appeared during sleep
|
||||
total_pending = (
|
||||
db_session.query(models.FilesystemState)
|
||||
.filter(
|
||||
models.FilesystemState.sha256_hash.is_(None),
|
||||
models.FilesystemState.is_ignored.is_(False),
|
||||
)
|
||||
.count()
|
||||
+ self.files_hashed
|
||||
)
|
||||
time.sleep(1)
|
||||
continue
|
||||
# Otherwise, we are done
|
||||
break
|
||||
|
||||
if JobManager.is_cancelled(hashing_job.id):
|
||||
@@ -473,8 +442,14 @@ class ScannerService:
|
||||
}
|
||||
|
||||
for future in concurrent.futures.as_completed(future_to_file):
|
||||
if not self.is_hashing:
|
||||
break
|
||||
|
||||
target_record = future_to_file[future]
|
||||
computed_hash = future.result()
|
||||
try:
|
||||
computed_hash = future.result()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if computed_hash:
|
||||
target_record.sha256_hash = computed_hash
|
||||
@@ -485,20 +460,32 @@ class ScannerService:
|
||||
99.9,
|
||||
(self.files_hashed / max(total_pending, 1)) * 100,
|
||||
)
|
||||
status_msg = f"Hashing: {self.files_hashed}/{total_pending} objects processed [{self._format_throughput()}]"
|
||||
if self.is_throttled:
|
||||
status_msg += " (THROTTLED)"
|
||||
JobManager.update_job(
|
||||
hashing_job.id, progress, status_msg
|
||||
hashing_job.id,
|
||||
progress,
|
||||
f"Hashed {self.files_hashed} files...",
|
||||
)
|
||||
|
||||
db_session.commit()
|
||||
# Commit batch
|
||||
try:
|
||||
db_session.commit()
|
||||
except (StaleDataError, Exception):
|
||||
db_session.rollback()
|
||||
break
|
||||
|
||||
JobManager.complete_job(hashing_job.id)
|
||||
except Exception as hashing_error:
|
||||
logger.error(f"Background hashing failed: {hashing_error}")
|
||||
JobManager.fail_job(hashing_job.id, str(hashing_error))
|
||||
finally:
|
||||
if not JobManager.is_cancelled(hashing_job.id) and self.is_hashing:
|
||||
JobManager.complete_job(hashing_job.id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Background hashing failed: {e}")
|
||||
# Try to report failure, but don't blow up if JobManager fails too
|
||||
try:
|
||||
if "hashing_job" in locals():
|
||||
JobManager.fail_job(hashing_job.id, str(e))
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
with self._metrics_lock:
|
||||
self.is_hashing = False
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user