dependency injection

This commit is contained in:
2026-04-26 19:53:01 -04:00
parent f0876a5050
commit 905b7013ec
8 changed files with 1752 additions and 1722 deletions
+279 -272
View File
@@ -13,201 +13,194 @@ from app.db.database import SessionLocal
class JobManager:
"""Manages operational job states and persistence."""
@staticmethod
def create_job(db: Session, job_type: str) -> models.Job:
job = models.Job(job_type=job_type, status="PENDING")
db.add(job)
db.commit()
db.refresh(job)
return job
def create_job(db_session: Session, job_type: str) -> models.Job:
"""Creates a new job record in the database."""
job_record = models.Job(job_type=job_type, status="PENDING")
db_session.add(job_record)
db_session.commit()
db_session.refresh(job_record)
return job_record
@staticmethod
def start_job(job_id: int):
db = SessionLocal()
try:
job = db.get(models.Job, job_id)
if job:
job.status = "RUNNING"
job.started_at = datetime.now(timezone.utc)
db.commit()
finally:
db.close()
"""Marks a job as running and sets the start timestamp."""
with SessionLocal() as db_session:
job_record = db_session.get(models.Job, job_id)
if job_record:
job_record.status = "RUNNING"
job_record.started_at = datetime.now(timezone.utc)
db_session.commit()
@staticmethod
def update_job(job_id: int, progress: float, current_task: str):
db = SessionLocal()
try:
job = db.get(models.Job, job_id)
if job:
job.progress = progress
job.current_task = current_task
db.commit()
finally:
db.close()
"""Updates the progress and current task description for a job."""
with SessionLocal() as db_session:
job_record = db_session.get(models.Job, job_id)
if job_record:
job_record.progress = progress
job_record.current_task = current_task
db_session.commit()
@staticmethod
def complete_job(job_id: int):
db = SessionLocal()
try:
job = db.get(models.Job, job_id)
if job:
job.status = "COMPLETED"
job.progress = 100.0
job.completed_at = datetime.now(timezone.utc)
db.commit()
finally:
db.close()
"""Marks a job as successfully completed."""
with SessionLocal() as db_session:
job_record = db_session.get(models.Job, job_id)
if job_record:
job_record.status = "COMPLETED"
job_record.progress = 100.0
job_record.completed_at = datetime.now(timezone.utc)
db_session.commit()
@staticmethod
def fail_job(job_id: int, error_message: str):
db = SessionLocal()
try:
job = db.get(models.Job, job_id)
if job:
job.status = "FAILED"
job.error_message = error_message
job.completed_at = datetime.now(timezone.utc)
db.commit()
finally:
db.close()
"""Marks a job as failed and records the error message."""
with SessionLocal() as db_session:
job_record = db_session.get(models.Job, job_id)
if job_record:
job_record.status = "FAILED"
job_record.error_message = error_message
job_record.completed_at = datetime.now(timezone.utc)
db_session.commit()
@staticmethod
def cancel_job(job_id: int):
db = SessionLocal()
try:
job = db.get(models.Job, job_id)
if job and job.status in ["PENDING", "RUNNING"]:
job.status = "FAILED"
job.error_message = "Cancelled by user"
job.completed_at = datetime.now(timezone.utc)
db.commit()
finally:
db.close()
"""Submits a cancellation request for a pending or running job."""
with SessionLocal() as db_session:
job_record = db_session.get(models.Job, job_id)
if job_record and job_record.status in ["PENDING", "RUNNING"]:
job_record.status = "FAILED"
job_record.error_message = "Cancelled by user"
job_record.completed_at = datetime.now(timezone.utc)
db_session.commit()
@staticmethod
def is_cancelled(job_id: int) -> bool:
db = SessionLocal()
try:
job = db.get(models.Job, job_id)
"""Checks if a job has been cancelled by the user."""
with SessionLocal() as db_session:
job_record = db_session.get(models.Job, job_id)
return bool(
job
and job.status == "FAILED"
and job.error_message == "Cancelled by user"
job_record
and job_record.status == "FAILED"
and job_record.error_message == "Cancelled by user"
)
finally:
db.close()
class ScannerService:
"""Handles recursive filesystem discovery and content indexing."""
def __init__(self):
self.is_running = False
self.is_hashing = False
self.is_running: bool = False
self.is_hashing: bool = False
self.last_run_time: Optional[datetime] = None
# Metrics
self.files_processed = 0
self.files_hashed = 0
self.files_new = 0
self.files_modified = 0
self.total_files_found = 0
self.bytes_hashed = 0
self.start_time = 0.0
self.is_throttled = False
self.current_path = ""
self._lock = threading.Lock()
self._current_iowait = 0.0
# Thread-safe Metrics
self.files_processed: int = 0
self.files_hashed: int = 0
self.files_new: int = 0
self.files_modified: int = 0
self.total_files_found: int = 0
self.bytes_hashed: int = 0
self.start_time: float = 0.0
self.is_throttled: bool = False
self.current_path: str = ""
self._metrics_lock = threading.Lock()
self._current_iowait: float = 0.0
# Stalling Tracker
self._last_block_time = time.time()
self._active_hashes: Dict[int, str] = {} # thread_id -> current_file
# Throttle Monitor
# Background Monitors
self._throttle_thread = threading.Thread(
target=self._monitor_iowait, daemon=True
)
self._throttle_thread.start()
def _monitor_iowait(self):
"""Background thread to poll system pressure once per second (Efficient)"""
"""Polls system I/O pressure to enable dynamic back-off."""
while True:
try:
cpu_times = psutil.cpu_times_percent(interval=1)
iowait = getattr(cpu_times, "iowait", 0.0)
with self._lock:
self.is_throttled = iowait > 5.0
self._current_iowait = iowait
except Exception:
iowait_value = getattr(cpu_times, "iowait", 0.0)
with self._metrics_lock:
self.is_throttled = iowait_value > 5.0
self._current_iowait = iowait_value
except Exception as monitor_error:
logger.debug(f"I/O Monitor pulse failed: {monitor_error}")
time.sleep(1)
def _set_priority(self, level: str = "normal"):
"""Sets the current process priority. 'normal' or 'background'"""
def _set_process_priority(self, level: str = "normal"):
"""Adjusts CPU and I/O priority for the current process."""
try:
if level == "background":
os.nice(19)
if hasattr(psutil.Process(), "ionice"):
psutil.Process().ionice(psutil.IOPRIO_CLASS_IDLE)
process_handle = psutil.Process()
process_handle.ionice(psutil.IOPRIO_CLASS_IDLE)
else:
os.nice(0)
if hasattr(psutil.Process(), "ionice"):
psutil.Process().ionice(psutil.IOPRIO_CLASS_BE, value=4)
except Exception:
pass
process_handle = psutil.Process()
process_handle.ionice(psutil.IOPRIO_CLASS_BE, value=4)
except Exception as priority_error:
logger.debug(f"Priority adjustment restricted: {priority_error}")
def compute_sha256(self, file_path: str, job_id: Optional[int] = None) -> str:
sha256_hash = hashlib.sha256()
thread_id = threading.get_ident()
"""Computes the SHA-256 hash of a file with block-level throttling."""
hash_engine = hashlib.sha256()
try:
with open(file_path, "rb") as f:
with self._lock:
self._active_hashes[thread_id] = file_path
with open(file_path, "rb") as file_handle:
with self._metrics_lock:
# Not used currently but could be for debugging
pass
for byte_block in iter(lambda: f.read(1048576), b""):
for byte_block in iter(lambda: file_handle.read(1048576), b""):
if job_id is not None and JobManager.is_cancelled(job_id):
return ""
# Efficient throttle check
# Dynamic throttling
if self.is_throttled:
delay = 0.05 if self._current_iowait < 15.0 else 0.2
time.sleep(delay)
throttle_delay = 0.05 if self._current_iowait < 15.0 else 0.2
time.sleep(throttle_delay)
sha256_hash.update(byte_block)
hash_engine.update(byte_block)
with self._lock:
with self._metrics_lock:
self.bytes_hashed += len(byte_block)
self._last_block_time = time.time() # Pulse!
return sha256_hash.hexdigest()
except Exception as e:
logger.error(f"Failed to hash {file_path}: {e}")
return hash_engine.hexdigest()
except OSError as io_error:
logger.error(f"IO Error during hashing {file_path}: {io_error}")
return ""
except Exception as generic_error:
logger.error(f"Unexpected error hashing {file_path}: {generic_error}")
return ""
finally:
with self._lock:
if thread_id in self._active_hashes:
del self._active_hashes[thread_id]
def _format_speed(self) -> str:
elapsed = time.time() - self.start_time
if elapsed <= 0:
def _format_throughput(self) -> str:
"""Calculates and formats current hashing speed."""
elapsed_seconds = time.time() - self.start_time
if elapsed_seconds <= 0:
return "0 B/s"
speed = self.bytes_hashed / elapsed
bytes_per_second = self.bytes_hashed / elapsed_seconds
for unit in ["B/s", "KB/s", "MB/s", "GB/s"]:
if speed < 1024:
return f"{speed:.1f} {unit}"
speed /= 1024
return f"{speed:.1f} TB/s"
if bytes_per_second < 1024:
return f"{bytes_per_second:.1f} {unit}"
bytes_per_second /= 1024
return f"{bytes_per_second:.1f} TB/s"
def scan_sources(self, db: Session, job_id: Optional[int] = None):
"""Metadata Discovery - Runs at Normal Priority"""
def scan_sources(self, db_session: Session, job_id: Optional[int] = None):
"""Executes Phase 1: Fast Metadata Discovery."""
if self.is_running:
logger.warning("Discovery scan already active.")
return
self.is_running = True
self.files_processed = 0
self.files_new = 0
self.files_modified = 0
self.total_files_found = 0
self.current_path = ""
self._set_priority("normal")
self._set_process_priority("normal")
if job_id is not None:
JobManager.start_job(job_id)
@@ -215,70 +208,80 @@ class ScannerService:
try:
from app.api.system import get_exclusion_spec, get_source_roots
spec = get_exclusion_spec(db)
roots = get_source_roots(db)
tracking_rules = db.query(models.TrackedSource).all()
tracking_map = {s.path: s.action for s in tracking_rules}
exclusion_spec = get_exclusion_spec(db_session)
source_roots = get_source_roots(db_session)
tracking_rules = db_session.query(models.TrackedSource).all()
tracking_map = {rule.path: rule.action for rule in tracking_rules}
def get_status(path: str) -> Tuple[bool, bool]:
is_ignored = False
if spec and spec.match_file(path):
is_ignored = True
applicable = []
for r_path, action in tracking_map.items():
if path == r_path or path.startswith(r_path + "/"):
applicable.append((len(r_path), action))
if not applicable:
return not is_ignored, is_ignored
applicable.sort(key=lambda x: x[0], reverse=True)
return applicable[0][1] == "include", is_ignored
def resolve_tracking(absolute_path: str) -> Tuple[bool, bool]:
ignored_by_policy = False
if exclusion_spec and exclusion_spec.match_file(absolute_path):
ignored_by_policy = True
now = datetime.now(timezone.utc)
applicable_rules = []
for rule_path, action in tracking_map.items():
if absolute_path == rule_path or absolute_path.startswith(
rule_path + "/"
):
applicable_rules.append((len(rule_path), action))
if not applicable_rules:
return not ignored_by_policy, ignored_by_policy
applicable_rules.sort(key=lambda x: x[0], reverse=True)
return applicable_rules[0][1] == "include", ignored_by_policy
current_timestamp = datetime.now(timezone.utc)
BATCH_SIZE = 1000
pending: List[Dict[str, Any]] = []
pending_metadata: List[Dict[str, Any]] = []
# Wake up hashing engine immediately
# Initialize Phase 2 in background
threading.Thread(target=self.run_hashing).start()
for root_path in roots:
for root_base in source_roots:
if job_id is not None and JobManager.is_cancelled(job_id):
break
if not os.path.exists(root_path):
if not os.path.exists(root_base):
continue
for root_dir, dirs, files in os.walk(root_path):
for current_dir, sub_dirs, file_names in os.walk(root_base):
if job_id is not None and JobManager.is_cancelled(job_id):
break
if spec:
for d in list(dirs):
if spec.match_file(os.path.join(root_dir, d) + "/"):
dirs.remove(d)
for file in files:
full_path = os.path.join(root_dir, file)
with self._lock:
# Prune directories early to save syscalls
if exclusion_spec:
for directory_name in list(sub_dirs):
full_dir_path = os.path.join(current_dir, directory_name)
if exclusion_spec.match_file(full_dir_path + "/"):
sub_dirs.remove(directory_name)
for name in file_names:
full_file_path = os.path.join(current_dir, name)
with self._metrics_lock:
self.total_files_found += 1
self.current_path = root_dir
self.current_path = current_dir
try:
st = os.stat(full_path)
tracked, ignored = get_status(full_path)
pending.append(
file_stats = os.stat(full_file_path)
is_tracked, is_ignored = resolve_tracking(full_file_path)
pending_metadata.append(
{
"path": full_path,
"size": st.st_size,
"mtime": st.st_mtime,
"tracked": tracked,
"ignored": ignored,
"path": full_file_path,
"size": file_stats.st_size,
"mtime": file_stats.st_mtime,
"tracked": is_tracked,
"ignored": is_ignored,
}
)
except Exception:
except (OSError, FileNotFoundError):
continue
if len(pending) >= BATCH_SIZE:
self._sync_metadata_batch(db, pending, now)
db.commit()
pending = []
if len(pending_metadata) >= BATCH_SIZE:
self._sync_metadata_batch(
db_session, pending_metadata, current_timestamp
)
db_session.commit()
pending_metadata = []
if job_id is not None:
JobManager.update_job(
job_id,
@@ -286,145 +289,149 @@ class ScannerService:
f"Discovered {self.total_files_found} items...",
)
if pending:
self._sync_metadata_batch(db, pending, now)
db.commit()
db.commit()
if pending_metadata:
self._sync_metadata_batch(
db_session, pending_metadata, current_timestamp
)
db_session.commit()
if job_id is not None and not JobManager.is_cancelled(job_id):
JobManager.complete_job(job_id)
self.last_run_time = now
self.last_run_time = current_timestamp
except Exception as e:
logger.exception(f"Scan failed: {e}")
db.rollback()
except Exception as scan_error:
logger.exception(f"Metadata discovery failed: {scan_error}")
db_session.rollback()
if job_id is not None:
JobManager.fail_job(job_id, str(e))
JobManager.fail_job(job_id, str(scan_error))
finally:
self.is_running = False
def _sync_metadata_batch(self, db: Session, batch: List[Dict[str, Any]], now):
paths = [f["path"] for f in batch]
existing = {
r.file_path: r
for r in db.query(models.FilesystemState)
.filter(models.FilesystemState.file_path.in_(paths))
.all()
}
for f in batch:
ext = existing.get(f["path"])
if not ext:
with self._lock:
def _sync_metadata_batch(
self, db_session: Session, batch: List[Dict[str, Any]], timestamp: datetime
):
"""Synchronizes a batch of metadata with the database index."""
file_paths = [file_meta["path"] for file_meta in batch]
# Batch Fetch Existing Metadata (Chunked for SQLite limits)
existing_records = {}
SQLITE_VARIABLE_LIMIT = 500
for i in range(0, len(file_paths), SQLITE_VARIABLE_LIMIT):
chunk = file_paths[i : i + SQLITE_VARIABLE_LIMIT]
chunk_records = (
db_session.query(models.FilesystemState)
.filter(models.FilesystemState.file_path.in_(chunk))
.all()
)
for record in chunk_records:
existing_records[record.file_path] = record
for file_meta in batch:
record = existing_records.get(file_meta["path"])
if not record:
with self._metrics_lock:
self.files_new += 1
db.add(
db_session.add(
models.FilesystemState(
file_path=f["path"],
size=f["size"],
mtime=f["mtime"],
is_ignored=f["ignored"],
last_seen_timestamp=now,
file_path=file_meta["path"],
size=file_meta["size"],
mtime=file_meta["mtime"],
is_ignored=file_meta["ignored"],
last_seen_timestamp=timestamp,
is_indexed=False,
)
)
else:
if ext.size != f["size"] or ext.mtime != f["mtime"]:
ext.is_indexed = False
if ext.size != f["size"] or ext.mtime != f["mtime"]:
with self._lock:
metadata_changed = (
record.size != file_meta["size"]
or record.mtime != file_meta["mtime"]
)
if metadata_changed:
record.is_indexed = False
with self._metrics_lock:
self.files_modified += 1
ext.size = f["size"]
ext.mtime = f["mtime"]
ext.is_ignored = f["ignored"]
ext.last_seen_timestamp = now
with self._lock:
record.size = file_meta["size"]
record.mtime = file_meta["mtime"]
record.is_ignored = file_meta["ignored"]
record.last_seen_timestamp = timestamp
with self._metrics_lock:
self.files_processed += 1
def run_hashing(self):
"""Content Hashing Engine - Low Priority Background Worker"""
"""Executes Phase 2: Background Content Hashing."""
if self.is_hashing:
return
with self._lock:
with self._metrics_lock:
self.is_hashing = True
self._set_priority("background")
db = SessionLocal()
job = JobManager.create_job(db, "HASH")
JobManager.start_job(job.id)
self._set_process_priority("background")
self.start_time = time.time()
self.bytes_hashed = 0
self.files_hashed = 0
with SessionLocal() as db_session:
hashing_job = JobManager.create_job(db_session, "HASH")
JobManager.start_job(hashing_job.id)
try:
while True:
targets = (
db.query(models.FilesystemState)
.filter(
models.FilesystemState.is_indexed.is_(False),
models.FilesystemState.is_ignored.is_(False),
self.start_time = time.time()
self.bytes_hashed = 0
self.files_hashed = 0
try:
while True:
# Find unindexed work
hashing_targets = (
db_session.query(models.FilesystemState)
.filter(
models.FilesystemState.is_indexed.is_(False),
models.FilesystemState.is_ignored.is_(False),
)
.limit(100)
.all()
)
.limit(100)
.all()
)
if not targets:
# If discovery is still running, wait for more metadata to hit the DB
if self.is_running:
time.sleep(2)
continue
break
if not hashing_targets:
if self.is_running:
time.sleep(2)
continue
break
if JobManager.is_cancelled(job.id):
break
if JobManager.is_cancelled(hashing_job.id):
break
workers = os.cpu_count() or 4
with concurrent.futures.ThreadPoolExecutor(
max_workers=workers
) as executor:
futures = {
executor.submit(self.compute_sha256, t.file_path, job.id): t
for t in targets
}
for future in concurrent.futures.as_completed(futures):
t = futures[future]
h = future.result()
if h:
t.sha256_hash = h
t.is_indexed = True
self.files_hashed += 1
max_workers = os.cpu_count() or 4
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as hashing_executor:
future_to_file = {
hashing_executor.submit(
self.compute_sha256, target.file_path, hashing_job.id
): target
for target in hashing_targets
}
if self.files_hashed % 5 == 0:
# RICH HEARTBEAT STATUS
with self._lock:
stall_time = time.time() - self._last_block_time
is_stalled = stall_time > 60.0
for future in concurrent.futures.as_completed(future_to_file):
target_record = future_to_file[future]
computed_hash = future.result()
active_files = list(self._active_hashes.values())
first_active = (
active_files[0].split("/")[-1]
if active_files
else "Waiting..."
)
if computed_hash:
target_record.sha256_hash = computed_hash
target_record.is_indexed = True
self.files_hashed += 1
status = f"Hashing: {self.files_hashed} objs [{self._format_speed()}] | Active: {first_active}"
if is_stalled:
status = (
f"⚠️ STALLED ({int(stall_time)}s) | {status}"
)
elif self.is_throttled:
status += " (THROTTLED)"
if self.files_hashed % 5 == 0:
status_msg = f"Hashing Fleet: {self.files_hashed} objects processed [{self._format_throughput()}]"
if self.is_throttled:
status_msg += " (THROTTLED)"
JobManager.update_job(hashing_job.id, 50.0, status_msg)
JobManager.update_job(job.id, 50.0, status)
db.commit()
db_session.commit()
JobManager.complete_job(job.id)
except Exception as e:
logger.error(f"Hashing job failed: {e}")
JobManager.fail_job(job.id, str(e))
finally:
self.is_hashing = False
db.close()
JobManager.complete_job(hashing_job.id)
except Exception as hashing_error:
logger.error(f"Background hashing failed: {hashing_error}")
JobManager.fail_job(hashing_job.id, str(hashing_error))
finally:
self.is_hashing = False
scanner_manager = ScannerService()