remove 'fast hashing' that was actually slower
This commit is contained in:
+42
-239
@@ -20,10 +20,6 @@ from app.db.database import SessionLocal
|
|||||||
# Detected once at import time; falls back to os.walk if unavailable.
|
# Detected once at import time; falls back to os.walk if unavailable.
|
||||||
_FAST_FIND_BINARY: Optional[str] = None
|
_FAST_FIND_BINARY: Optional[str] = None
|
||||||
|
|
||||||
# Fast hashing via `sha256sum` or `shasum`.
|
|
||||||
# Detected once at import time; falls back to Python hashlib if unavailable.
|
|
||||||
_FAST_HASH_BINARY: Optional[str] = None
|
|
||||||
|
|
||||||
|
|
||||||
def _detect_fast_find() -> Optional[str]:
|
def _detect_fast_find() -> Optional[str]:
|
||||||
"""Check if a `find` binary with `-printf` support is available.
|
"""Check if a `find` binary with `-printf` support is available.
|
||||||
@@ -48,132 +44,19 @@ def _detect_fast_find() -> Optional[str]:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _detect_fast_hash() -> Optional[str]:
|
def _init_fast_features() -> Optional[str]:
|
||||||
"""Check if a SHA-256 binary is available for batch hashing.
|
global _FAST_FIND_BINARY
|
||||||
|
|
||||||
Tries `sha256sum` (GNU coreutils, Linux/Homebrew) then `shasum` (macOS).
|
|
||||||
Returns the binary path if it works, otherwise ``None``.
|
|
||||||
"""
|
|
||||||
# Try sha256sum first (Linux, Homebrew gnu-coreutils)
|
|
||||||
binary = shutil.which("sha256sum")
|
|
||||||
if binary:
|
|
||||||
try:
|
|
||||||
result = subprocess.run(
|
|
||||||
[binary, "/dev/null"],
|
|
||||||
capture_output=True,
|
|
||||||
timeout=5,
|
|
||||||
)
|
|
||||||
if (
|
|
||||||
result.returncode == 0
|
|
||||||
and b"e3b0c44298fc1c149afbf4c8996fb924" in result.stdout
|
|
||||||
):
|
|
||||||
return binary
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Try shasum (macOS default)
|
|
||||||
binary = shutil.which("shasum")
|
|
||||||
if binary:
|
|
||||||
try:
|
|
||||||
result = subprocess.run(
|
|
||||||
[binary, "-a", "256", "/dev/null"],
|
|
||||||
capture_output=True,
|
|
||||||
timeout=5,
|
|
||||||
)
|
|
||||||
if (
|
|
||||||
result.returncode == 0
|
|
||||||
and b"e3b0c44298fc1c149afbf4c8996fb924" in result.stdout
|
|
||||||
):
|
|
||||||
return binary
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _init_fast_features() -> Tuple[Optional[str], Optional[str]]:
|
|
||||||
global _FAST_FIND_BINARY, _FAST_HASH_BINARY
|
|
||||||
_FAST_FIND_BINARY = _detect_fast_find()
|
_FAST_FIND_BINARY = _detect_fast_find()
|
||||||
_FAST_HASH_BINARY = _detect_fast_hash()
|
|
||||||
|
|
||||||
if _FAST_FIND_BINARY:
|
if _FAST_FIND_BINARY:
|
||||||
logger.info(f"Fast file discovery enabled: using {_FAST_FIND_BINARY} -printf")
|
logger.info(f"Fast file discovery enabled: using {_FAST_FIND_BINARY} -printf")
|
||||||
else:
|
else:
|
||||||
logger.info("Fast file discovery unavailable: falling back to os.walk")
|
logger.info("Fast file discovery unavailable: falling back to os.walk")
|
||||||
|
|
||||||
if _FAST_HASH_BINARY:
|
return _FAST_FIND_BINARY
|
||||||
logger.info(f"Fast hashing enabled: using {_FAST_HASH_BINARY}")
|
|
||||||
else:
|
|
||||||
logger.info("Fast hashing unavailable: falling back to Python hashlib")
|
|
||||||
|
|
||||||
return _FAST_FIND_BINARY, _FAST_HASH_BINARY
|
|
||||||
|
|
||||||
|
|
||||||
_FAST_FIND_BINARY, _FAST_HASH_BINARY = _init_fast_features()
|
_FAST_FIND_BINARY = _init_fast_features()
|
||||||
|
|
||||||
|
|
||||||
def _hash_file_batch_fast(
|
|
||||||
file_paths: List[str], binary: str
|
|
||||||
) -> Dict[str, Optional[str]]:
|
|
||||||
"""Hash a batch of files using a native SHA-256 binary.
|
|
||||||
|
|
||||||
Streams output line-by-line via subprocess.Popen for incremental progress.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_paths: Paths to hash.
|
|
||||||
binary: Path to sha256sum or shasum.
|
|
||||||
|
|
||||||
Returns a mapping of file_path -> hex_digest (or None on failure).
|
|
||||||
"""
|
|
||||||
results: Dict[str, Optional[str]] = {}
|
|
||||||
|
|
||||||
if not file_paths:
|
|
||||||
return results
|
|
||||||
|
|
||||||
# Build command: shasum needs -a 256 prefix, sha256sum doesn't
|
|
||||||
if binary.endswith("sha256sum"):
|
|
||||||
cmd = [binary, "--"] + file_paths
|
|
||||||
else:
|
|
||||||
# shasum
|
|
||||||
cmd = [binary, "-a", "256", "--"] + file_paths
|
|
||||||
|
|
||||||
try:
|
|
||||||
proc = subprocess.Popen(
|
|
||||||
cmd,
|
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.DEVNULL,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Stream output line-by-line for incremental progress
|
|
||||||
if proc.stdout is None:
|
|
||||||
return results
|
|
||||||
for line in iter(proc.stdout.readline, b""):
|
|
||||||
line = line.strip()
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
# Format: "<hash> <path>" or "<hash> *<path>"
|
|
||||||
parts = line.split(b" ", 1)
|
|
||||||
if len(parts) != 2:
|
|
||||||
# Try single space with binary marker: "<hash> *<path>"
|
|
||||||
parts = line.split(b" *", 1)
|
|
||||||
if len(parts) != 2:
|
|
||||||
continue
|
|
||||||
|
|
||||||
file_hash = parts[0].decode("ascii", errors="replace").lower()
|
|
||||||
raw_path = parts[1].decode("utf-8", errors="replace")
|
|
||||||
|
|
||||||
# sha256sum may escape backslashes in filenames; handle common case
|
|
||||||
clean_path = raw_path.replace("\\\\", "\\")
|
|
||||||
|
|
||||||
results[clean_path] = file_hash
|
|
||||||
|
|
||||||
proc.stdout.close()
|
|
||||||
proc.wait()
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Native hash batch failed: {e}")
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
|
|
||||||
def _discover_files_fast(
|
def _discover_files_fast(
|
||||||
@@ -751,10 +634,8 @@ class ScannerService:
|
|||||||
.count()
|
.count()
|
||||||
)
|
)
|
||||||
|
|
||||||
# Fast hash batch size: more files per batch reduces subprocess overhead
|
|
||||||
HASH_BATCH_SIZE = 100 if _FAST_HASH_BINARY else 100
|
|
||||||
# How many files to pull from DB per iteration
|
# How many files to pull from DB per iteration
|
||||||
FETCH_LIMIT = HASH_BATCH_SIZE * 4
|
FETCH_LIMIT = 400
|
||||||
|
|
||||||
while self.is_hashing:
|
while self.is_hashing:
|
||||||
# Find unindexed work (exclude deleted files - they cannot be hashed)
|
# Find unindexed work (exclude deleted files - they cannot be hashed)
|
||||||
@@ -780,126 +661,48 @@ class ScannerService:
|
|||||||
if JobManager.is_cancelled(hashing_job.id):
|
if JobManager.is_cancelled(hashing_job.id):
|
||||||
break
|
break
|
||||||
|
|
||||||
if _FAST_HASH_BINARY:
|
# Hash files using Python hashlib via thread pool
|
||||||
# Fast path: batch files to native sha256sum/shasum
|
max_workers = os.cpu_count() or 4
|
||||||
# Group into sub-batches of HASH_BATCH_SIZE for parallel processing
|
with concurrent.futures.ThreadPoolExecutor(
|
||||||
file_paths = [t.file_path for t in hashing_targets]
|
max_workers=max_workers
|
||||||
path_to_record = {t.file_path: t for t in hashing_targets}
|
) as hashing_executor:
|
||||||
|
future_to_file = {
|
||||||
|
hashing_executor.submit(
|
||||||
|
self.compute_sha256,
|
||||||
|
target.file_path,
|
||||||
|
hashing_job.id,
|
||||||
|
): target
|
||||||
|
for target in hashing_targets
|
||||||
|
}
|
||||||
|
|
||||||
sub_batches = [
|
for future in concurrent.futures.as_completed(future_to_file):
|
||||||
file_paths[i : i + HASH_BATCH_SIZE]
|
if not self.is_hashing:
|
||||||
for i in range(0, len(file_paths), HASH_BATCH_SIZE)
|
break
|
||||||
]
|
|
||||||
|
|
||||||
max_workers = min(os.cpu_count() or 4, len(sub_batches))
|
target_record = future_to_file[future]
|
||||||
with concurrent.futures.ThreadPoolExecutor(
|
try:
|
||||||
max_workers=max_workers
|
computed_hash = future.result()
|
||||||
) as hashing_executor:
|
except Exception:
|
||||||
future_to_batch = {
|
continue
|
||||||
hashing_executor.submit(
|
|
||||||
_hash_file_batch_fast,
|
|
||||||
batch,
|
|
||||||
_FAST_HASH_BINARY,
|
|
||||||
): batch
|
|
||||||
for batch in sub_batches
|
|
||||||
}
|
|
||||||
|
|
||||||
for future in concurrent.futures.as_completed(
|
if computed_hash:
|
||||||
future_to_batch
|
target_record.sha256_hash = computed_hash
|
||||||
):
|
self.files_hashed += 1
|
||||||
if not self.is_hashing:
|
elif not os.path.exists(target_record.file_path):
|
||||||
break
|
target_record.is_deleted = True
|
||||||
|
|
||||||
batch = future_to_batch[future]
|
|
||||||
try:
|
|
||||||
batch_results = future.result()
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Apply hashes and detect missing files ONLY for this batch
|
|
||||||
for file_path in batch:
|
|
||||||
target_record = path_to_record.get(file_path)
|
|
||||||
if not target_record:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if file_path in batch_results:
|
|
||||||
target_record.sha256_hash = batch_results[
|
|
||||||
file_path
|
|
||||||
]
|
|
||||||
with self._metrics_lock:
|
|
||||||
self.bytes_hashed += target_record.size or 0
|
|
||||||
self.files_hashed += 1
|
|
||||||
# Report progress incrementally as files complete
|
|
||||||
if self.files_hashed % 5 == 0:
|
|
||||||
progress = min(
|
|
||||||
99.9,
|
|
||||||
(
|
|
||||||
self.files_hashed
|
|
||||||
/ max(total_pending, 1)
|
|
||||||
)
|
|
||||||
* 100,
|
|
||||||
)
|
|
||||||
JobManager.update_job(
|
|
||||||
hashing_job.id,
|
|
||||||
progress,
|
|
||||||
f"Hashed {self.files_hashed} files ({self._format_throughput()})...",
|
|
||||||
)
|
|
||||||
elif not os.path.exists(file_path):
|
|
||||||
target_record.is_deleted = True
|
|
||||||
with self._metrics_lock:
|
|
||||||
self.files_missing += 1
|
|
||||||
|
|
||||||
# Throttle between sub-batches if I/O pressure is high
|
|
||||||
with self._metrics_lock:
|
with self._metrics_lock:
|
||||||
should_throttle = self.is_throttled
|
self.files_missing += 1
|
||||||
if should_throttle:
|
|
||||||
time.sleep(0.5)
|
if self.files_hashed % 5 == 0:
|
||||||
else:
|
progress = min(
|
||||||
# Compatibility path: Python hashlib via thread pool
|
99.9,
|
||||||
max_workers = os.cpu_count() or 4
|
(self.files_hashed / max(total_pending, 1)) * 100,
|
||||||
with concurrent.futures.ThreadPoolExecutor(
|
)
|
||||||
max_workers=max_workers
|
JobManager.update_job(
|
||||||
) as hashing_executor:
|
|
||||||
future_to_file = {
|
|
||||||
hashing_executor.submit(
|
|
||||||
self.compute_sha256,
|
|
||||||
target.file_path,
|
|
||||||
hashing_job.id,
|
hashing_job.id,
|
||||||
): target
|
progress,
|
||||||
for target in hashing_targets
|
f"Hashed {self.files_hashed} files ({self._format_throughput()})...",
|
||||||
}
|
)
|
||||||
|
|
||||||
for future in concurrent.futures.as_completed(
|
|
||||||
future_to_file
|
|
||||||
):
|
|
||||||
if not self.is_hashing:
|
|
||||||
break
|
|
||||||
|
|
||||||
target_record = future_to_file[future]
|
|
||||||
try:
|
|
||||||
computed_hash = future.result()
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if computed_hash:
|
|
||||||
target_record.sha256_hash = computed_hash
|
|
||||||
self.files_hashed += 1
|
|
||||||
elif not os.path.exists(target_record.file_path):
|
|
||||||
target_record.is_deleted = True
|
|
||||||
with self._metrics_lock:
|
|
||||||
self.files_missing += 1
|
|
||||||
|
|
||||||
if self.files_hashed % 5 == 0:
|
|
||||||
progress = min(
|
|
||||||
99.9,
|
|
||||||
(self.files_hashed / max(total_pending, 1))
|
|
||||||
* 100,
|
|
||||||
)
|
|
||||||
JobManager.update_job(
|
|
||||||
hashing_job.id,
|
|
||||||
progress,
|
|
||||||
f"Hashed {self.files_hashed} files ({self._format_throughput()})...",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Commit batch
|
# Commit batch
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -1,12 +1,9 @@
|
|||||||
import hashlib
|
import hashlib
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
import pytest
|
|
||||||
from app.services.scanner import (
|
from app.services.scanner import (
|
||||||
ScannerService,
|
ScannerService,
|
||||||
JobManager,
|
JobManager,
|
||||||
_hash_file_batch_fast,
|
|
||||||
_FAST_HASH_BINARY,
|
|
||||||
)
|
)
|
||||||
from app.db import models
|
from app.db import models
|
||||||
|
|
||||||
@@ -145,46 +142,6 @@ def test_scan_sources_mocked(db_session, mocker):
|
|||||||
assert record.size == 500
|
assert record.size == 500
|
||||||
|
|
||||||
|
|
||||||
def test_hash_file_batch_fast(tmp_path):
|
|
||||||
"""Tests native sha256sum/shasum batch hashing if available."""
|
|
||||||
if _FAST_HASH_BINARY is None:
|
|
||||||
pytest.skip("No native hash binary available")
|
|
||||||
|
|
||||||
# Create test files
|
|
||||||
files = {}
|
|
||||||
for i in range(5):
|
|
||||||
content = f"test content {i}".encode()
|
|
||||||
f = tmp_path / f"file_{i}.txt"
|
|
||||||
f.write_bytes(content)
|
|
||||||
files[str(f)] = hashlib.sha256(content).hexdigest()
|
|
||||||
|
|
||||||
# Hash via native binary
|
|
||||||
results = _hash_file_batch_fast(list(files.keys()), _FAST_HASH_BINARY)
|
|
||||||
|
|
||||||
assert len(results) == 5
|
|
||||||
for path, expected_hash in files.items():
|
|
||||||
assert results[path] == expected_hash
|
|
||||||
|
|
||||||
|
|
||||||
def test_hash_file_batch_fast_empty():
|
|
||||||
"""Tests that empty batch returns empty results."""
|
|
||||||
if _FAST_HASH_BINARY is None:
|
|
||||||
pytest.skip("No native hash binary available")
|
|
||||||
|
|
||||||
results = _hash_file_batch_fast([], _FAST_HASH_BINARY)
|
|
||||||
assert results == {}
|
|
||||||
|
|
||||||
|
|
||||||
def test_hash_file_batch_fast_nonexistent():
|
|
||||||
"""Tests that non-existent files are silently skipped."""
|
|
||||||
if _FAST_HASH_BINARY is None:
|
|
||||||
pytest.skip("No native hash binary available")
|
|
||||||
|
|
||||||
results = _hash_file_batch_fast(["/nonexistent/path"], _FAST_HASH_BINARY)
|
|
||||||
# Non-existent files should not produce hash entries
|
|
||||||
assert results == {}
|
|
||||||
|
|
||||||
|
|
||||||
def test_missing_file_marked_deleted_at_end_of_scan(db_session, mocker):
|
def test_missing_file_marked_deleted_at_end_of_scan(db_session, mocker):
|
||||||
"""Tests that files not seen during a scan are marked as deleted."""
|
"""Tests that files not seen during a scan are marked as deleted."""
|
||||||
scanner = ScannerService()
|
scanner = ScannerService()
|
||||||
@@ -259,8 +216,6 @@ def test_missing_file_during_hashing_marked_deleted(db_session, mocker):
|
|||||||
"""Tests that files missing during hashing are marked as deleted."""
|
"""Tests that files missing during hashing are marked as deleted."""
|
||||||
scanner = ScannerService()
|
scanner = ScannerService()
|
||||||
|
|
||||||
mocker.patch("app.services.scanner._FAST_HASH_BINARY", None)
|
|
||||||
|
|
||||||
f = models.FilesystemState(
|
f = models.FilesystemState(
|
||||||
file_path="/data/vanished.bin", size=10, mtime=1, is_ignored=False
|
file_path="/data/vanished.bin", size=10, mtime=1, is_ignored=False
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user