format negotiation
This commit is contained in:
@@ -37,28 +37,31 @@ This document (`GEMINI.md`) contains critical, contextual information about the
|
||||
|
||||
## 3. Core Architectural Rules
|
||||
|
||||
### Hardware & Media Lifecycle
|
||||
* **Hardware Decoupling:** Hardware configuration (tape drive paths, mount roots) is global. Media objects represent only identity and capacity.
|
||||
* **Identification:** Tapes use barcodes/IDs via `mtx`/SCSI. HDDs use **Filesystem UUIDs** as a hardware fingerprint to remain path-agnostic if mount points change.
|
||||
* **S3-Compatible targets:** Standardized on `s3` media type using `boto3`. Collects Endpoint URL, Bucket, and HMAC credentials during ingestion.
|
||||
### Storage Providers & Media Lifecycle
|
||||
* **Plugin Architecture:** All storage destinations are treated as plugins implementing `AbstractStorageProvider`. Avoid hardcoding hardware logic (`tape`, `hdd`) in the API or UI.
|
||||
* **Dynamic UI:** The frontend dynamically renders registration and edit forms based on a provider's `config_schema` (fetched from `GET /inventory/providers`).
|
||||
* **Standardized Telemetry:** Providers must implement `get_live_info(force: bool)` to return unified telemetry (e.g., drive status, capacity).
|
||||
* **Sanitization:** Initializing media performs a full purge of existing TapeHoard data if the `force` flag is set.
|
||||
* **Hardware Failure:** Marking media as "Failed" triggers an automatic atomic purge of all associated `file_versions` to surface those files as "Pending" on the dashboard.
|
||||
|
||||
### Database & Performance
|
||||
* **High Concurrency:** SQLite must always run in **WAL (Write-Ahead Logging)** mode with a 30s busy timeout and larger page cache.
|
||||
* **Archival Intent:** `is_ignored` in `filesystem_state` is the single source of truth. The scanner indexes all files but lazily marks excluded ones as `is_ignored = 1`. Explicit user tracking policies override global exclusions.
|
||||
* **Aggregate Intelligence:** Use Raw SQL Aggregates for dashboard stats and directory protection status to avoid N+1 query patterns.
|
||||
* **FTS5 Search:** Full-text search is managed via triggers. Ensure searches filter for `has_version = 1` when browsing the Archive Index.
|
||||
* **FTS5 Search:** Full-text search is managed via triggers. Ensure searches filter for `has_version = 1` when browsing the Archive Index, regardless of current `is_ignored` state on disk.
|
||||
|
||||
### Scanning & Hashing Architecture
|
||||
* **Concurrent Phasing:** Decoupled into `SCAN` (Metadata, Normal priority) and `HASH` (Content, Idle priority with dynamic `iowait` throttling).
|
||||
* **Thread-Safe Metrics:** All counters (files processed, bytes hashed) must be protected by a `threading.Lock`.
|
||||
* **Hashing Progress:** Hashing jobs calculate progress against a dynamically updating snapshot of total `is_indexed = 0 AND is_ignored = 0` files.
|
||||
|
||||
### Archival & Recovery
|
||||
* **Bitstream Integrity:** `RangeFile` must guarantee exact byte counts. If a file is truncated on disk during backup, it must be padded with null bytes to prevent corrupting the tar alignment.
|
||||
* **Metadata Fidelity:** The restorer must preserve original **permissions (chmod)**, **timestamps (utime)**, and **ownership (chown)** when recovering files.
|
||||
* **Seekable Restoration:** Non-tape media (HDD/S3) must use `mode="r:*"` (Seekable) for robust partial restores, while Tapes use `mode="r|*"` (Pipe).
|
||||
* **Path Normalization:** Aggressively strip leading slashes and `./` prefixes from both DB keys and tar members to ensure matches across different environments.
|
||||
* **Independence:** Force all archive members to be **Regular Files** to break fragile hard-link dependencies. Symlinks are preserved as `SYMTYPE` with relative targets.
|
||||
* **Format Negotiation:** The Archiver adapts formats based on provider capabilities (`supports_random_access`).
|
||||
* *Sequential (Tape):* Uses `.tar` streams to maintain drive streaming.
|
||||
* *Random Access (HDD/Cloud):* Uses native direct file copying/objects to enable instant seekless restores without unpacking gigabytes of data.
|
||||
* **Bitstream Integrity:** `RangeFile` must guarantee exact byte counts for tar alignment.
|
||||
* **Metadata Fidelity:** The restorer must preserve original **permissions (chmod)**, **timestamps (utime)**, and **ownership (chown)** when recovering files natively or via tar.
|
||||
* **Independence:** Force all tar archive members to be **Regular Files** to break fragile hard-link dependencies. Symlinks are preserved as `SYMTYPE` (or `.symlink` stub objects for native format).
|
||||
|
||||
### Deployment & Testing
|
||||
* **Temporal Standard:** Backend uses **UTC**. Frontend uses `parseUTCDate` to convert to browser **Local Time**.
|
||||
@@ -75,8 +78,9 @@ This document (`GEMINI.md`) contains critical, contextual information about the
|
||||
* **Centralized Schemas:** Define shared Pydantic models in `app.api.schemas` to avoid circular dependencies when importing across different routers.
|
||||
|
||||
### Hardware Polling & Stability
|
||||
* **Non-Intrusive Polling:** Hardware status checks (e.g., tape drive identity) must prioritize non-intrusive methods like reading the MAM (Media Auxiliary Memory) Barcode (`sg_read_attr`). Intrusive operations (like `mt rewind`) should only be used as fallbacks and never during periodic status polling when the drive is busy.
|
||||
* **Last Known Good (LKG) Caching:** Implement LKG caching in hardware providers to persist the last successful hardware read. If a status poll fails because a device is temporarily busy with an archival job, return the LKG state instead of empty data to prevent UI flickering.
|
||||
* **Non-Intrusive Polling:** Hardware status checks must prioritize non-intrusive methods (e.g., reading MAM via `sg_read_attr`). Intrusive operations (`mt rewind`) are strictly fallbacks. Always verify device path existence (`os.path.exists`) before issuing SCSI/CLI commands to prevent log spam on disconnected drives.
|
||||
* **Last Known Good (LKG) Caching:** Implement LKG caching in both backend hardware providers and frontend UI state. If a status poll fails or returns empty because a device is temporarily busy with an archival job, preserve and return the LKG state to prevent UI flickering.
|
||||
* **Forced Refreshes:** Hardware polling defaults to throttled (e.g., 2 seconds) intervals. Use `force=True` on provider calls and `?refresh=true` on API endpoints to bypass throttling when the user explicitly requests a live update or upon initial page loads.
|
||||
|
||||
### Frontend Reactivity
|
||||
* **Svelte 5 State:** When mutating complex data structures like `Map` or `Set` in Svelte 5 `$state`, always explicitly reassign the variable (e.g., `myMap = new Map(myMap)`) after mutation to trigger the reactivity engine.
|
||||
|
||||
@@ -571,9 +571,14 @@ def browse_archive_index(path: str = "ROOT", db_session: Session = Depends(get_d
|
||||
stats = db_session.execute(
|
||||
prot_check, {"r": root, "prefix": f"{root}/%"}
|
||||
).fetchone()
|
||||
total = stats[0] or 0
|
||||
protected = stats[1] or 0
|
||||
media_list = stats[2].split(",") if stats[2] else []
|
||||
|
||||
total = 0
|
||||
protected = 0
|
||||
media_list = []
|
||||
if stats:
|
||||
total = stats[0] or 0
|
||||
protected = stats[1] or 0
|
||||
media_list = stats[2].split(",") if stats[2] else []
|
||||
|
||||
if protected > 0:
|
||||
results.append(
|
||||
|
||||
@@ -77,6 +77,16 @@ class AbstractStorageProvider(ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
def write_file_direct(
|
||||
self, media_id: str, relative_path: str, stream: BinaryIO
|
||||
) -> str:
|
||||
"""
|
||||
Writes a single file directly to the media using its relative path.
|
||||
Only supported if capabilities['supports_random_access'] is True.
|
||||
Returns the location_id (e.g. the path itself or an object key).
|
||||
"""
|
||||
raise NotImplementedError("This provider does not support random access.")
|
||||
|
||||
@abstractmethod
|
||||
def finalize_media(self, media_id: str):
|
||||
"""Finalizes the media (e.g., writing index, ejecting)"""
|
||||
|
||||
@@ -189,6 +189,53 @@ class CloudStorageProvider(AbstractStorageProvider):
|
||||
logger.error(f"Cloud upload failed: {e}")
|
||||
raise
|
||||
|
||||
def write_file_direct(
|
||||
self, media_id: str, relative_path: str, stream: BinaryIO
|
||||
) -> str:
|
||||
"""
|
||||
Uploads a single file directly to the cloud, maintaining structure.
|
||||
"""
|
||||
clean_path = relative_path.lstrip("./")
|
||||
object_key = f"objects/{clean_path}"
|
||||
|
||||
if self.passphrase:
|
||||
logger.info(
|
||||
f"Uploading AES-256-GCM object to {self.bucket_name}/{object_key}"
|
||||
)
|
||||
|
||||
# 1. Setup crypto artifacts
|
||||
salt = os.urandom(16)
|
||||
nonce = os.urandom(12)
|
||||
key = self._derive_key(salt)
|
||||
|
||||
# 2. Encrypt
|
||||
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
|
||||
data = stream.read()
|
||||
ciphertext, tag = cipher.encrypt_and_digest(data)
|
||||
|
||||
# 3. Concatenate and upload
|
||||
payload = salt + nonce + tag + ciphertext
|
||||
|
||||
try:
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=object_key,
|
||||
Body=payload,
|
||||
Metadata={"x-amz-meta-tapehoard-encrypted": "v2-gcm"},
|
||||
)
|
||||
return object_key
|
||||
except Exception as e:
|
||||
logger.error(f"GCM cloud object upload failed: {e}")
|
||||
raise
|
||||
else:
|
||||
logger.info(f"Uploading plain object to {self.bucket_name}/{object_key}")
|
||||
try:
|
||||
self.s3.upload_fileobj(stream, self.bucket_name, object_key)
|
||||
return object_key
|
||||
except Exception as e:
|
||||
logger.error(f"Cloud object upload failed: {e}")
|
||||
raise
|
||||
|
||||
def read_archive(self, media_id: str, location_id: str) -> BinaryIO:
|
||||
"""Retrieves and decrypts an AES-GCM archive"""
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=location_id)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import os
|
||||
import shutil
|
||||
from typing import Optional, BinaryIO
|
||||
from typing import Optional, BinaryIO, Dict, Any
|
||||
from .base import AbstractStorageProvider
|
||||
from loguru import logger
|
||||
|
||||
@@ -38,10 +38,10 @@ class OfflineHDDProvider(AbstractStorageProvider):
|
||||
def get_name(self) -> str:
|
||||
return self.name
|
||||
|
||||
def get_live_info(self, force: bool = False) -> dict:
|
||||
def get_live_info(self, force: bool = False) -> Dict[str, Any]:
|
||||
import psutil
|
||||
|
||||
info = {"online": self.check_online(force=force)}
|
||||
info: Dict[str, Any] = {"online": self.check_online(force=force)}
|
||||
if info["online"]:
|
||||
try:
|
||||
usage = psutil.disk_usage(self.mount_base)
|
||||
@@ -158,28 +158,75 @@ class OfflineHDDProvider(AbstractStorageProvider):
|
||||
|
||||
return location_id
|
||||
|
||||
def read_archive(self, media_id: str, location_id: str) -> BinaryIO:
|
||||
"""Locates and opens a numbered archive volume."""
|
||||
# Standardize on the 6-digit padded format
|
||||
file_name = f"{int(location_id):06d}.tar"
|
||||
def write_file_direct(
|
||||
self, media_id: str, relative_path: str, stream: BinaryIO
|
||||
) -> str:
|
||||
"""Writes a single file directly, maintaining its original directory structure under an 'objects/' folder."""
|
||||
# Sanitize path to prevent breakout
|
||||
clean_path = relative_path.lstrip("./")
|
||||
target_path = os.path.join(
|
||||
self.mount_base, "tapehoard_backups", "archives", file_name
|
||||
self.mount_base, "tapehoard_backups", "objects", clean_path
|
||||
)
|
||||
|
||||
if not os.path.exists(target_path):
|
||||
# Fallback for older non-padded files if they exist
|
||||
legacy_path = os.path.join(
|
||||
self.mount_base, "tapehoard_backups", "archives", f"{location_id}.tar"
|
||||
)
|
||||
if os.path.exists(legacy_path):
|
||||
target_path = legacy_path
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"Archive {location_id} not found on media {media_id} (Checked {target_path})"
|
||||
)
|
||||
# Ensure path is strictly within the objects directory
|
||||
base_objects_dir = os.path.join(self.mount_base, "tapehoard_backups", "objects")
|
||||
if not os.path.abspath(target_path).startswith(
|
||||
os.path.abspath(base_objects_dir)
|
||||
):
|
||||
raise ValueError(f"Invalid relative path for direct write: {relative_path}")
|
||||
|
||||
logger.info(f"Opening bitstream from HDD: {target_path}")
|
||||
return open(target_path, "rb")
|
||||
os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
||||
|
||||
logger.info(f"Direct copy to HDD: {target_path}")
|
||||
with open(target_path, "wb") as f:
|
||||
shutil.copyfileobj(stream, f)
|
||||
|
||||
# Return the clean path as the location_id so it can be restored later
|
||||
return clean_path
|
||||
|
||||
def read_archive(self, media_id: str, location_id: str) -> BinaryIO:
|
||||
"""Locates and opens a numbered archive volume or a direct object."""
|
||||
# Try direct object path first (Format Negotiation)
|
||||
clean_path = location_id.lstrip("./")
|
||||
object_target_path = os.path.join(
|
||||
self.mount_base, "tapehoard_backups", "objects", clean_path
|
||||
)
|
||||
base_objects_dir = os.path.join(self.mount_base, "tapehoard_backups", "objects")
|
||||
|
||||
if os.path.abspath(object_target_path).startswith(
|
||||
os.path.abspath(base_objects_dir)
|
||||
) and os.path.exists(object_target_path):
|
||||
logger.info(f"Opening direct object from HDD: {object_target_path}")
|
||||
return open(object_target_path, "rb")
|
||||
|
||||
# Fallback to sequential tar archive format
|
||||
try:
|
||||
file_name = f"{int(location_id):06d}.tar"
|
||||
target_path = os.path.join(
|
||||
self.mount_base, "tapehoard_backups", "archives", file_name
|
||||
)
|
||||
|
||||
if not os.path.exists(target_path):
|
||||
# Fallback for older non-padded files if they exist
|
||||
legacy_path = os.path.join(
|
||||
self.mount_base,
|
||||
"tapehoard_backups",
|
||||
"archives",
|
||||
f"{location_id}.tar",
|
||||
)
|
||||
if os.path.exists(legacy_path):
|
||||
target_path = legacy_path
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"Archive {location_id} not found on media {media_id} (Checked {target_path})"
|
||||
)
|
||||
|
||||
logger.info(f"Opening bitstream from HDD: {target_path}")
|
||||
return open(target_path, "rb")
|
||||
except ValueError:
|
||||
raise FileNotFoundError(
|
||||
f"Direct object not found and location ID '{location_id}' is not a valid archive number."
|
||||
)
|
||||
|
||||
def finalize_media(self, media_id: str):
|
||||
"""No special finalization needed for HDD."""
|
||||
|
||||
@@ -313,7 +313,11 @@ class ArchiverService:
|
||||
|
||||
# Packaging
|
||||
if remaining_to_write:
|
||||
with tarfile.open(staging_full_path, "w") as tar_bundle:
|
||||
batch_uuid = str(uuid.uuid4())
|
||||
|
||||
if storage_provider.capabilities.get("supports_random_access"):
|
||||
import io
|
||||
|
||||
for item in remaining_to_write:
|
||||
if JobManager.is_cancelled(job_id):
|
||||
break
|
||||
@@ -328,43 +332,102 @@ class ArchiverService:
|
||||
if item["is_split"]:
|
||||
internal_name += f".part_{start}_{end}"
|
||||
|
||||
archive_location_id = None
|
||||
if os.path.lexists(file_state.file_path):
|
||||
# Manual TarInfo to ensure strict alignment and bitstream independence
|
||||
tar_info = tar_bundle.gettarinfo(
|
||||
file_state.file_path, arcname=internal_name
|
||||
)
|
||||
|
||||
if os.path.islink(file_state.file_path):
|
||||
# Preserve Symlinks with their relative targets
|
||||
tar_info.type = tarfile.SYMTYPE
|
||||
tar_info.linkname = os.readlink(file_state.file_path)
|
||||
tar_bundle.addfile(
|
||||
tar_info
|
||||
) # Links have no data payload
|
||||
target_path = os.readlink(file_state.file_path)
|
||||
with io.BytesIO(
|
||||
target_path.encode("utf-8")
|
||||
) as link_stub:
|
||||
archive_location_id = (
|
||||
storage_provider.write_file_direct(
|
||||
media_record.identifier,
|
||||
internal_name + ".symlink",
|
||||
link_stub,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# FORCE regular file for everything else (destroys hard-links for reliability)
|
||||
tar_info.type = tarfile.REGTYPE
|
||||
tar_info.linkname = ""
|
||||
tar_info.size = chunk_size
|
||||
|
||||
with RangeFile(
|
||||
file_state.file_path, start, chunk_size
|
||||
) as rh:
|
||||
tar_bundle.addfile(tar_info, rh)
|
||||
archive_location_id = (
|
||||
storage_provider.write_file_direct(
|
||||
media_record.identifier, internal_name, rh
|
||||
)
|
||||
)
|
||||
|
||||
processed_bytes += chunk_size
|
||||
JobManager.update_job(
|
||||
job_id,
|
||||
15.0 + (70.0 * (processed_bytes / safe_divisor)),
|
||||
f"Archiving: {os.path.basename(file_state.file_path)}",
|
||||
)
|
||||
if archive_location_id:
|
||||
processed_bytes += chunk_size
|
||||
media_record.bytes_used += chunk_size
|
||||
JobManager.update_job(
|
||||
job_id,
|
||||
15.0 + (70.0 * (processed_bytes / safe_divisor)),
|
||||
f"Uploading natively: {os.path.basename(file_state.file_path)}",
|
||||
)
|
||||
|
||||
db_session.add(
|
||||
models.FileVersion(
|
||||
filesystem_state_id=file_state.id,
|
||||
media_id=media_record.id,
|
||||
file_number=archive_location_id,
|
||||
is_split=item["is_split"],
|
||||
split_id=batch_uuid if item["is_split"] else None,
|
||||
offset_start=item["offset_start"],
|
||||
offset_end=item["offset_end"],
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Sequential Media (Tape): Tar Stream
|
||||
with tarfile.open(staging_full_path, "w") as tar_bundle:
|
||||
for item in remaining_to_write:
|
||||
if JobManager.is_cancelled(job_id):
|
||||
break
|
||||
|
||||
file_state, start, end = (
|
||||
item["file_state"],
|
||||
item["offset_start"],
|
||||
item["offset_end"],
|
||||
)
|
||||
chunk_size = end - start
|
||||
internal_name = self.normalize_path(file_state.file_path)
|
||||
if item["is_split"]:
|
||||
internal_name += f".part_{start}_{end}"
|
||||
|
||||
if os.path.lexists(file_state.file_path):
|
||||
tar_info = tar_bundle.gettarinfo(
|
||||
file_state.file_path, arcname=internal_name
|
||||
)
|
||||
|
||||
if os.path.islink(file_state.file_path):
|
||||
tar_info.type = tarfile.SYMTYPE
|
||||
tar_info.linkname = os.readlink(
|
||||
file_state.file_path
|
||||
)
|
||||
tar_bundle.addfile(tar_info)
|
||||
else:
|
||||
tar_info.type = tarfile.REGTYPE
|
||||
tar_info.linkname = ""
|
||||
tar_info.size = chunk_size
|
||||
|
||||
with RangeFile(
|
||||
file_state.file_path, start, chunk_size
|
||||
) as rh:
|
||||
tar_bundle.addfile(tar_info, rh)
|
||||
|
||||
processed_bytes += chunk_size
|
||||
JobManager.update_job(
|
||||
job_id,
|
||||
15.0 + (70.0 * (processed_bytes / safe_divisor)),
|
||||
f"Archiving: {os.path.basename(file_state.file_path)}",
|
||||
)
|
||||
|
||||
if JobManager.is_cancelled(job_id):
|
||||
return
|
||||
|
||||
# Finalize Staging
|
||||
if remaining_to_write:
|
||||
# Sync staging file to disk
|
||||
# Finalize Staging for Sequential
|
||||
if remaining_to_write and not storage_provider.capabilities.get(
|
||||
"supports_random_access"
|
||||
):
|
||||
with open(staging_full_path, "a") as f:
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
@@ -378,7 +441,6 @@ class ArchiverService:
|
||||
)
|
||||
media_record.bytes_used += os.path.getsize(staging_full_path)
|
||||
|
||||
batch_uuid = str(uuid.uuid4())
|
||||
for item in remaining_to_write:
|
||||
db_session.add(
|
||||
models.FileVersion(
|
||||
@@ -498,6 +560,62 @@ class ArchiverService:
|
||||
bitstream = provider.read_archive(
|
||||
media_record.identifier, archive_id
|
||||
)
|
||||
|
||||
is_tar = (
|
||||
not provider.capabilities.get("supports_random_access")
|
||||
or str(archive_id).endswith(".tar")
|
||||
or str(archive_id).isdigit()
|
||||
)
|
||||
|
||||
if not is_tar:
|
||||
# Format Negotiation: Direct file recovery
|
||||
for v in target_versions:
|
||||
if JobManager.is_cancelled(job_id):
|
||||
break
|
||||
final_path = self._sanitize_recovery_path(
|
||||
destination_root, v.file_state.file_path
|
||||
)
|
||||
os.makedirs(os.path.dirname(final_path), exist_ok=True)
|
||||
|
||||
# Handle symlink stubs
|
||||
if str(archive_id).endswith(".symlink"):
|
||||
target_link_path = bitstream.read().decode("utf-8")
|
||||
if os.path.lexists(final_path):
|
||||
os.remove(final_path)
|
||||
os.symlink(target_link_path, final_path)
|
||||
else:
|
||||
mode = "r+b" if os.path.exists(final_path) else "wb"
|
||||
with open(final_path, mode) as dst:
|
||||
if v.is_split:
|
||||
if mode == "wb":
|
||||
dst.truncate(v.file_state.size)
|
||||
dst.seek(v.offset_start)
|
||||
shutil.copyfileobj(bitstream, dst)
|
||||
|
||||
# Attempt to restore basic metadata (mtime) from index
|
||||
try:
|
||||
os.utime(
|
||||
final_path,
|
||||
(v.file_state.mtime, v.file_state.mtime),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
processed_bytes += v.offset_end - v.offset_start
|
||||
JobManager.update_job(
|
||||
job_id,
|
||||
min(
|
||||
99.0,
|
||||
5.0
|
||||
+ (
|
||||
90.0
|
||||
* (processed_bytes / max(v.file_state.size, 1))
|
||||
),
|
||||
),
|
||||
f"Restoring natively: {os.path.basename(v.file_state.file_path)}",
|
||||
)
|
||||
continue
|
||||
|
||||
tar_mode = "r|*" if media_record.media_type == "tape" else "r:*"
|
||||
|
||||
with tarfile.open(fileobj=bitstream, mode=tar_mode) as tar_bundle:
|
||||
|
||||
@@ -390,6 +390,16 @@ class ScannerService:
|
||||
self.bytes_hashed = 0
|
||||
self.files_hashed = 0
|
||||
|
||||
# Count total work pending for progress reporting
|
||||
total_pending = (
|
||||
db_session.query(models.FilesystemState)
|
||||
.filter(
|
||||
models.FilesystemState.is_indexed.is_(False),
|
||||
models.FilesystemState.is_ignored.is_(False),
|
||||
)
|
||||
.count()
|
||||
)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Find unindexed work
|
||||
@@ -406,6 +416,16 @@ class ScannerService:
|
||||
if not hashing_targets:
|
||||
if self.is_running:
|
||||
time.sleep(2)
|
||||
# Recount if more work appeared during sleep
|
||||
total_pending = (
|
||||
db_session.query(models.FilesystemState)
|
||||
.filter(
|
||||
models.FilesystemState.is_indexed.is_(False),
|
||||
models.FilesystemState.is_ignored.is_(False),
|
||||
)
|
||||
.count()
|
||||
+ self.files_hashed
|
||||
)
|
||||
continue
|
||||
break
|
||||
|
||||
@@ -433,10 +453,16 @@ class ScannerService:
|
||||
self.files_hashed += 1
|
||||
|
||||
if self.files_hashed % 5 == 0:
|
||||
status_msg = f"Hashing Fleet: {self.files_hashed} objects processed [{self._format_throughput()}]"
|
||||
progress = min(
|
||||
99.9,
|
||||
(self.files_hashed / max(total_pending, 1)) * 100,
|
||||
)
|
||||
status_msg = f"Hashing Fleet: {self.files_hashed}/{total_pending} objects processed [{self._format_throughput()}]"
|
||||
if self.is_throttled:
|
||||
status_msg += " (THROTTLED)"
|
||||
JobManager.update_job(hashing_job.id, 50.0, status_msg)
|
||||
JobManager.update_job(
|
||||
hashing_job.id, progress, status_msg
|
||||
)
|
||||
|
||||
db_session.commit()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user