fast discover was also slower than os.walk
This commit is contained in:
+34
-212
@@ -1,12 +1,10 @@
|
||||
import concurrent.futures
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import psutil
|
||||
from loguru import logger
|
||||
@@ -16,161 +14,6 @@ from sqlalchemy.orm.exc import ObjectDeletedError, StaleDataError
|
||||
from app.db import models
|
||||
from app.db.database import SessionLocal
|
||||
|
||||
# Fast file discovery via `find -printf` (GNU find or compatible).
|
||||
# Detected once at import time; falls back to os.walk if unavailable.
|
||||
_FAST_FIND_BINARY: Optional[str] = None
|
||||
|
||||
|
||||
def _detect_fast_find() -> Optional[str]:
|
||||
"""Check if a `find` binary with `-printf` support is available.
|
||||
|
||||
Tries `gfind` (GNU find via Homebrew on macOS) first, then `find`.
|
||||
Returns the binary path if `-printf` works, otherwise ``None``.
|
||||
"""
|
||||
for candidate in ("gfind", "find"):
|
||||
binary = shutil.which(candidate)
|
||||
if binary is None:
|
||||
continue
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[binary, "/tmp", "-maxdepth", "0", "-printf", "%f\n"],
|
||||
capture_output=True,
|
||||
timeout=5,
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip() == b"tmp":
|
||||
return binary
|
||||
except Exception:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def _init_fast_features() -> Optional[str]:
|
||||
global _FAST_FIND_BINARY
|
||||
_FAST_FIND_BINARY = _detect_fast_find()
|
||||
|
||||
if _FAST_FIND_BINARY:
|
||||
logger.info(f"Fast file discovery enabled: using {_FAST_FIND_BINARY} -printf")
|
||||
else:
|
||||
logger.info("Fast file discovery unavailable: falling back to os.walk")
|
||||
|
||||
return _FAST_FIND_BINARY
|
||||
|
||||
|
||||
_FAST_FIND_BINARY = _init_fast_features()
|
||||
|
||||
|
||||
def _discover_files_fast(
|
||||
root_base: str,
|
||||
job_id: Optional[int],
|
||||
batch_size: int,
|
||||
current_timestamp,
|
||||
resolve_tracking,
|
||||
sync_metadata_batch,
|
||||
metrics_lock,
|
||||
metrics,
|
||||
db_session: Session,
|
||||
) -> Tuple[int, int]:
|
||||
"""Walk a tree using `find -printf` for fast metadata extraction.
|
||||
|
||||
Streams output line-by-line via subprocess.Popen so progress updates
|
||||
appear as files are discovered instead of waiting for find to finish.
|
||||
|
||||
Returns (files_found, files_batched) counts.
|
||||
"""
|
||||
total_files_found = 0
|
||||
files_batched = 0
|
||||
pending_metadata: List[Dict[str, Any]] = []
|
||||
|
||||
# -printf format: path\tsize\tmtime (tab-separated; split from right for safety)
|
||||
find_binary = _FAST_FIND_BINARY
|
||||
if find_binary is None:
|
||||
logger.warning(
|
||||
"Fast file discovery requested but no compatible `find` binary found"
|
||||
)
|
||||
return 0, 0
|
||||
cmd = [
|
||||
find_binary,
|
||||
root_base,
|
||||
"-type",
|
||||
"f",
|
||||
"-printf",
|
||||
"%p\t%s\t%T@\n",
|
||||
]
|
||||
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
if proc.stdout is None:
|
||||
logger.error(
|
||||
f"Fast file discovery failed: could not open stdout for {root_base}"
|
||||
)
|
||||
return 0, 0
|
||||
except Exception as e:
|
||||
logger.error(f"Fast file discovery failed for {root_base}: {e}")
|
||||
return 0, 0
|
||||
|
||||
# Stream output line by line (tab-separated: path\tsize\tmtime)
|
||||
for line in iter(proc.stdout.readline, b""):
|
||||
if job_id is not None and JobManager.is_cancelled(job_id):
|
||||
break
|
||||
|
||||
if not line.strip():
|
||||
continue
|
||||
|
||||
# Split from right: mtime and size are always numeric
|
||||
parts = line.split(b"\t")
|
||||
if len(parts) < 3:
|
||||
continue
|
||||
|
||||
# First n-2 parts may be path components (tabs in filename are rare)
|
||||
full_file_path = b"\t".join(parts[:-2]).decode("utf-8", errors="replace")
|
||||
try:
|
||||
file_size = int(parts[-2])
|
||||
file_mtime = float(parts[-1])
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
|
||||
total_files_found += 1
|
||||
with metrics_lock:
|
||||
metrics["total_files_found"] = total_files_found
|
||||
metrics["current_path"] = os.path.dirname(full_file_path)
|
||||
|
||||
is_ignored = resolve_tracking(full_file_path)
|
||||
pending_metadata.append(
|
||||
{
|
||||
"path": full_file_path,
|
||||
"size": file_size,
|
||||
"mtime": file_mtime,
|
||||
"ignored": is_ignored,
|
||||
}
|
||||
)
|
||||
|
||||
if len(pending_metadata) >= batch_size:
|
||||
sync_metadata_batch(db_session, pending_metadata, current_timestamp)
|
||||
db_session.commit()
|
||||
files_batched += len(pending_metadata)
|
||||
pending_metadata = []
|
||||
if job_id is not None:
|
||||
JobManager.update_job(
|
||||
job_id,
|
||||
10.0,
|
||||
f"Discovered {total_files_found} items...",
|
||||
)
|
||||
|
||||
proc.stdout.close()
|
||||
proc.wait()
|
||||
|
||||
# Flush remaining batch
|
||||
if pending_metadata:
|
||||
sync_metadata_batch(db_session, pending_metadata, current_timestamp)
|
||||
db_session.commit()
|
||||
files_batched += len(pending_metadata)
|
||||
|
||||
return total_files_found, files_batched
|
||||
|
||||
|
||||
class JobManager:
|
||||
"""Manages operational job states and persistence with high resilience for background threads."""
|
||||
@@ -439,63 +282,42 @@ class ScannerService:
|
||||
if not os.path.exists(root_base):
|
||||
continue
|
||||
|
||||
if _FAST_FIND_BINARY:
|
||||
# Fast path: GNU find -printf (metadata extracted in C)
|
||||
metrics = {
|
||||
"total_files_found": 0,
|
||||
"current_path": root_base,
|
||||
}
|
||||
found, _ = _discover_files_fast(
|
||||
root_base,
|
||||
job_id,
|
||||
BATCH_SIZE,
|
||||
current_timestamp,
|
||||
resolve_tracking,
|
||||
self._sync_metadata_batch,
|
||||
self._metrics_lock,
|
||||
metrics,
|
||||
db_session,
|
||||
)
|
||||
with self._metrics_lock:
|
||||
self.total_files_found += found
|
||||
else:
|
||||
# Compatibility path: Python os.walk + os.stat
|
||||
for current_dir, _sub_dirs, file_names in os.walk(root_base):
|
||||
if job_id is not None and JobManager.is_cancelled(job_id):
|
||||
break
|
||||
for current_dir, _sub_dirs, file_names in os.walk(root_base):
|
||||
if job_id is not None and JobManager.is_cancelled(job_id):
|
||||
break
|
||||
|
||||
for name in file_names:
|
||||
full_file_path = os.path.join(current_dir, name)
|
||||
with self._metrics_lock:
|
||||
self.total_files_found += 1
|
||||
self.current_path = current_dir
|
||||
for name in file_names:
|
||||
full_file_path = os.path.join(current_dir, name)
|
||||
with self._metrics_lock:
|
||||
self.total_files_found += 1
|
||||
self.current_path = current_dir
|
||||
|
||||
try:
|
||||
file_stats = os.stat(full_file_path)
|
||||
is_ignored = resolve_tracking(full_file_path)
|
||||
pending_metadata.append(
|
||||
{
|
||||
"path": full_file_path,
|
||||
"size": file_stats.st_size,
|
||||
"mtime": file_stats.st_mtime,
|
||||
"ignored": is_ignored,
|
||||
}
|
||||
try:
|
||||
file_stats = os.stat(full_file_path)
|
||||
is_ignored = resolve_tracking(full_file_path)
|
||||
pending_metadata.append(
|
||||
{
|
||||
"path": full_file_path,
|
||||
"size": file_stats.st_size,
|
||||
"mtime": file_stats.st_mtime,
|
||||
"ignored": is_ignored,
|
||||
}
|
||||
)
|
||||
except (OSError, FileNotFoundError):
|
||||
continue
|
||||
|
||||
if len(pending_metadata) >= BATCH_SIZE:
|
||||
self._sync_metadata_batch(
|
||||
db_session, pending_metadata, current_timestamp
|
||||
)
|
||||
db_session.commit()
|
||||
pending_metadata = []
|
||||
if job_id is not None:
|
||||
JobManager.update_job(
|
||||
job_id,
|
||||
10.0,
|
||||
f"Discovered {self.total_files_found} items...",
|
||||
)
|
||||
except (OSError, FileNotFoundError):
|
||||
continue
|
||||
|
||||
if len(pending_metadata) >= BATCH_SIZE:
|
||||
self._sync_metadata_batch(
|
||||
db_session, pending_metadata, current_timestamp
|
||||
)
|
||||
db_session.commit()
|
||||
pending_metadata = []
|
||||
if job_id is not None:
|
||||
JobManager.update_job(
|
||||
job_id,
|
||||
10.0,
|
||||
f"Discovered {self.total_files_found} items...",
|
||||
)
|
||||
|
||||
if pending_metadata:
|
||||
self._sync_metadata_batch(
|
||||
|
||||
@@ -114,9 +114,6 @@ def test_scan_sources_mocked(db_session, mocker):
|
||||
"""Tests the discovery scan with mocked filesystem."""
|
||||
scanner = ScannerService()
|
||||
|
||||
# Disable fast find so the test uses the os.walk fallback path
|
||||
mocker.patch("app.services.scanner._FAST_FIND_BINARY", None)
|
||||
|
||||
# Mock settings
|
||||
mocker.patch("app.api.common.get_source_roots", return_value=["/mock_source"])
|
||||
mocker.patch("app.api.common.get_exclusion_spec", return_value=None)
|
||||
@@ -146,7 +143,6 @@ def test_missing_file_marked_deleted_at_end_of_scan(db_session, mocker):
|
||||
"""Tests that files not seen during a scan are marked as deleted."""
|
||||
scanner = ScannerService()
|
||||
|
||||
mocker.patch("app.services.scanner._FAST_FIND_BINARY", None)
|
||||
mocker.patch("app.api.common.get_source_roots", return_value=["/mock_source"])
|
||||
mocker.patch("app.api.common.get_exclusion_spec", return_value=None)
|
||||
mocker.patch("os.walk", return_value=[])
|
||||
@@ -181,7 +177,6 @@ def test_existing_file_not_marked_deleted(db_session, mocker):
|
||||
"""Tests that files found during scan retain is_deleted=False."""
|
||||
scanner = ScannerService()
|
||||
|
||||
mocker.patch("app.services.scanner._FAST_FIND_BINARY", None)
|
||||
mocker.patch("app.api.common.get_source_roots", return_value=["/mock_source"])
|
||||
mocker.patch("app.api.common.get_exclusion_spec", return_value=None)
|
||||
mocker.patch("os.path.exists", return_value=True)
|
||||
|
||||
Reference in New Issue
Block a user