579 lines
23 KiB
Python
579 lines
23 KiB
Python
import subprocess
|
|
import os
|
|
import time
|
|
from typing import Optional, BinaryIO, cast, Dict, Any, List
|
|
import struct
|
|
import re
|
|
from .base import AbstractStorageProvider
|
|
from loguru import logger
|
|
|
|
|
|
class LTOProvider(AbstractStorageProvider):
|
|
provider_id = "lto_tape"
|
|
name = "LTO Tape Drive"
|
|
description = "Hardware Linear Tape-Open (LTO) drives."
|
|
capabilities = {
|
|
"supports_random_access": False,
|
|
"is_offline_capable": True,
|
|
"supports_hardware_encryption": True,
|
|
}
|
|
config_schema = {
|
|
"compression": {
|
|
"type": "boolean",
|
|
"title": "Hardware Compression",
|
|
"description": "Enable LTO hardware-level compression (default: True).",
|
|
"default": True,
|
|
},
|
|
"encryption_key_id": {
|
|
"type": "string",
|
|
"title": "Encryption Key ID",
|
|
"description": "Reference to a key stored in the system keystore.",
|
|
},
|
|
"generation": {
|
|
"type": "string",
|
|
"title": "LTO Generation",
|
|
"description": "Tape generation (LTO-5, LTO-6, LTO-7, LTO-8, LTO-9).",
|
|
"enum": ["LTO-5", "LTO-6", "LTO-7", "LTO-8", "LTO-9"],
|
|
},
|
|
"worm": {
|
|
"type": "boolean",
|
|
"title": "WORM (Write Once Read Many)",
|
|
"description": "Mark tape as Write Once Read Many.",
|
|
"default": False,
|
|
},
|
|
"write_protected": {
|
|
"type": "boolean",
|
|
"title": "Write Protected",
|
|
"description": "Physical write-protect switch status.",
|
|
"default": False,
|
|
},
|
|
"cleaning_cartridge": {
|
|
"type": "boolean",
|
|
"title": "Cleaning Cartridge",
|
|
"description": "Mark if this is a cleaning tape.",
|
|
"default": False,
|
|
},
|
|
}
|
|
|
|
# Class-level store for Last Known Good (LKG) hardware state
|
|
# device_path -> { "drive": {}, "mam": {}, "online": bool, "last_check": float }
|
|
_lkg_state: dict = {}
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
self.device_path = config.get("device_path", "/dev/nst0")
|
|
self.compression = config.get("compression", True)
|
|
self.encryption_key = config.get("encryption_key")
|
|
|
|
# Initialize LKG entry if not exists
|
|
if self.device_path not in LTOProvider._lkg_state:
|
|
LTOProvider._lkg_state[self.device_path] = {
|
|
"drive": {},
|
|
"mam": {},
|
|
"online": False,
|
|
"last_check": 0.0,
|
|
}
|
|
|
|
def _log_command(self, cmd: List[str]):
|
|
"""Logs the exact command being sent to the hardware."""
|
|
logger.debug(f"HARDWARE CMD: {' '.join(cmd)}")
|
|
|
|
def get_drive_info(self) -> dict:
|
|
"""Retrieves vendor, model, and firmware version of the tape drive."""
|
|
if not os.path.exists(self.device_path):
|
|
return {}
|
|
|
|
# Return LKG if already populated (drive hardware never changes)
|
|
if LTOProvider._lkg_state[self.device_path]["drive"]:
|
|
return LTOProvider._lkg_state[self.device_path]["drive"]
|
|
|
|
try:
|
|
cmd = ["sg_inq", self.device_path]
|
|
self._log_command(cmd)
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
info = {}
|
|
for line in result.stdout.splitlines():
|
|
if "Vendor identification:" in line:
|
|
info["vendor"] = line.split(":", 1)[1].strip()
|
|
elif "Product identification:" in line:
|
|
info["model"] = line.split(":", 1)[1].strip()
|
|
elif "Product revision level:" in line:
|
|
info["firmware"] = line.split(":", 1)[1].strip()
|
|
|
|
if info:
|
|
LTOProvider._lkg_state[self.device_path]["drive"] = info
|
|
return info
|
|
except Exception as e:
|
|
logger.debug(f"Direct drive inquiry failed for {self.device_path}: {e}")
|
|
|
|
return LTOProvider._lkg_state[self.device_path]["drive"]
|
|
|
|
def get_mam_info(self, force: bool = False) -> dict:
|
|
"""Reads Media Auxiliary Memory (MAM) attributes using sg_read_attr --raw."""
|
|
if not os.path.exists(self.device_path):
|
|
return {}
|
|
|
|
# Throttle MAM reads to once every 2 seconds unless forced
|
|
now = time.time()
|
|
if not force and (
|
|
now - LTOProvider._lkg_state[self.device_path].get("last_check", 0) < 2.0
|
|
):
|
|
return LTOProvider._lkg_state[self.device_path]["mam"]
|
|
|
|
# Try up to 3 times with a small backoff if the drive is busy
|
|
for attempt in range(3):
|
|
try:
|
|
cmd = ["sg_read_attr", "--raw", self.device_path]
|
|
self._log_command(cmd)
|
|
result = subprocess.run(cmd, capture_output=True, timeout=10)
|
|
|
|
if result.returncode == 0 and result.stdout:
|
|
data = result.stdout
|
|
if len(data) < 4:
|
|
continue
|
|
|
|
available_len = struct.unpack(">I", data[:4])[0]
|
|
pos = 4
|
|
end = min(pos + available_len, len(data))
|
|
|
|
mam = {}
|
|
attr_map = {
|
|
0x0000: "remaining_capacity_mib",
|
|
0x0001: "max_capacity_mib",
|
|
0x0002: "tape_alert_flags",
|
|
0x0003: "load_count",
|
|
0x0220: "lifetime_mib_written",
|
|
0x0221: "lifetime_mib_read",
|
|
0x0222: "session_mib_written",
|
|
0x0223: "session_mib_read",
|
|
0x0400: "manufacturer",
|
|
0x0401: "serial",
|
|
0x0405: "density",
|
|
0x0406: "manufacture_date",
|
|
0x0806: "barcode",
|
|
}
|
|
|
|
while pos + 5 <= end:
|
|
attr_id, flags, attr_len = struct.unpack(
|
|
">HBH", data[pos : pos + 5]
|
|
)
|
|
pos += 5
|
|
if pos + attr_len > end:
|
|
break
|
|
val_bytes = data[pos : pos + attr_len]
|
|
pos += attr_len
|
|
|
|
if attr_id in attr_map:
|
|
key = attr_map[attr_id]
|
|
if attr_id in [
|
|
0x0000,
|
|
0x0001,
|
|
0x0002,
|
|
0x0003,
|
|
0x0220,
|
|
0x0221,
|
|
0x0222,
|
|
0x0223,
|
|
]:
|
|
mam[key] = int.from_bytes(val_bytes, "big")
|
|
elif attr_id == 0x0405:
|
|
mam[key] = hex(val_bytes[0]) if val_bytes else "0x00"
|
|
else:
|
|
try:
|
|
val = (
|
|
val_bytes.decode("ascii", errors="ignore")
|
|
.split("\x00")[0]
|
|
.strip()
|
|
)
|
|
if val:
|
|
mam[key] = val
|
|
except Exception:
|
|
continue
|
|
|
|
if mam.get("tape_alert_flags"):
|
|
alerts = []
|
|
f = mam["tape_alert_flags"]
|
|
alert_map = {
|
|
3: "Hard Error",
|
|
4: "Media Error",
|
|
5: "Read Failure",
|
|
6: "Write Failure",
|
|
20: "Clean Now",
|
|
30: "Hardware Failure",
|
|
}
|
|
for bit, msg in alert_map.items():
|
|
if (f >> (64 - bit)) & 1:
|
|
alerts.append(msg)
|
|
mam["alerts"] = alerts
|
|
|
|
if "max_capacity_mib" in mam:
|
|
cap = mam["max_capacity_mib"]
|
|
if cap < 150000:
|
|
mam["generation_label"] = "LTO-1"
|
|
elif cap < 300000:
|
|
mam["generation_label"] = "LTO-2"
|
|
elif cap < 600000:
|
|
mam["generation_label"] = "LTO-3"
|
|
elif cap < 1200000:
|
|
mam["generation_label"] = "LTO-4"
|
|
elif cap < 2000000:
|
|
mam["generation_label"] = "LTO-5"
|
|
elif cap < 4000000:
|
|
mam["generation_label"] = "LTO-6"
|
|
elif cap < 10000000:
|
|
mam["generation_label"] = "LTO-7"
|
|
elif cap < 15000000:
|
|
mam["generation_label"] = "LTO-8"
|
|
else:
|
|
mam["generation_label"] = "LTO-9"
|
|
|
|
# 3. Barcode Fallback
|
|
if not mam.get("barcode") and mam.get("serial"):
|
|
mam["barcode"] = mam["serial"]
|
|
|
|
# SUCCESS! Update LKG MAM state
|
|
LTOProvider._lkg_state[self.device_path]["mam"] = mam
|
|
LTOProvider._lkg_state[self.device_path]["last_check"] = time.time()
|
|
return mam
|
|
|
|
# If we get "Device or resource busy", wait a bit and retry
|
|
stderr_text = (
|
|
(result.stderr or b"").decode().lower()
|
|
if isinstance(result.stderr, bytes)
|
|
else (result.stderr or "").lower()
|
|
)
|
|
if result.returncode != 0 and "busy" in stderr_text:
|
|
time.sleep(0.2)
|
|
continue
|
|
|
|
except Exception as e:
|
|
logger.debug(
|
|
f"MAM read attempt {attempt} failed for {self.device_path}: {e}"
|
|
)
|
|
time.sleep(0.1)
|
|
|
|
# Return LKG if direct read failed
|
|
return LTOProvider._lkg_state[self.device_path]["mam"]
|
|
|
|
def get_live_info(self, force: bool = False) -> Dict[str, Any]:
|
|
"""Performs a single-pass discovery of all hardware metrics to ensure consistency."""
|
|
prev_online = LTOProvider._lkg_state[self.device_path]["online"]
|
|
prev_barcode = LTOProvider._lkg_state[self.device_path]["mam"].get("barcode")
|
|
|
|
self.check_online(force=force)
|
|
# Since check_online throttles and sets online/last_check, we follow its lead
|
|
mam = self.get_mam_info(force=force)
|
|
drive = self.get_drive_info()
|
|
|
|
identity = mam.get("barcode") or mam.get("serial")
|
|
is_online = LTOProvider._lkg_state[self.device_path]["online"]
|
|
|
|
# Detection logic for state changes (Hardware Awareness)
|
|
needs_registration = False
|
|
if not prev_online and is_online:
|
|
if identity and not prev_barcode:
|
|
logger.info(f"DETECTED TAPE INSERTION: {identity}")
|
|
needs_registration = True
|
|
|
|
return {
|
|
"online": is_online,
|
|
"drive": drive,
|
|
"tape": mam,
|
|
"identity": identity,
|
|
"needs_registration": needs_registration,
|
|
}
|
|
|
|
def get_name(self) -> str:
|
|
return "LTO Tape"
|
|
|
|
def check_online(self, force: bool = False) -> bool:
|
|
"""Checks if the tape drive is online. Throttled to 2 seconds."""
|
|
if not os.path.exists(self.device_path):
|
|
LTOProvider._lkg_state[self.device_path]["online"] = False
|
|
return False
|
|
|
|
# Return LKG if we checked very recently
|
|
now = time.time()
|
|
if (
|
|
not force
|
|
and now - LTOProvider._lkg_state[self.device_path].get("last_check", 0)
|
|
< 2.0
|
|
):
|
|
return LTOProvider._lkg_state[self.device_path]["online"]
|
|
|
|
try:
|
|
cmd = ["mt", "-f", self.device_path, "status"]
|
|
self._log_command(cmd)
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
|
|
|
stderr = result.stderr or ""
|
|
stdout = result.stdout or ""
|
|
|
|
# "Device or resource busy" is a success for "is it online"
|
|
if (
|
|
"Device or resource busy" in stderr
|
|
or "Device or resource busy" in stdout
|
|
):
|
|
LTOProvider._lkg_state[self.device_path]["online"] = True
|
|
return True
|
|
|
|
is_online = (
|
|
"ONLINE" in stdout or "READY" in stdout or result.returncode == 0
|
|
)
|
|
|
|
# If we transitioned from online -> offline, clear the LKG MAM (tape was likely ejected)
|
|
if LTOProvider._lkg_state[self.device_path]["online"] and not is_online:
|
|
LTOProvider._lkg_state[self.device_path]["mam"] = {}
|
|
|
|
LTOProvider._lkg_state[self.device_path]["online"] = is_online
|
|
LTOProvider._lkg_state[self.device_path]["last_check"] = now
|
|
return is_online
|
|
except Exception:
|
|
return LTOProvider._lkg_state[self.device_path]["online"]
|
|
|
|
def is_write_protected(self) -> bool:
|
|
"""Checks if the tape is write-protected (read-only)"""
|
|
try:
|
|
cmd = ["mt", "-f", self.device_path, "status"]
|
|
self._log_command(cmd)
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
|
output = result.stdout.upper()
|
|
return (
|
|
"WR_PROT" in output
|
|
or "READ-ONLY" in output
|
|
or "WRITE PROTECT" in output
|
|
)
|
|
except Exception:
|
|
return False
|
|
|
|
def check_existing_data(self) -> bool:
|
|
"""Checks if the tape has data after the label (file mark 0)"""
|
|
if not self.check_online():
|
|
return False
|
|
try:
|
|
self._run_mt("rewind")
|
|
# Skip the label file (file 0)
|
|
self._run_mt("fsf 1")
|
|
cmd = ["mt", "-f", self.device_path, "status"]
|
|
self._log_command(cmd)
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
|
match = re.search(r"File number=(\d+)", result.stdout)
|
|
return bool(match and int(match.group(1)) > 0)
|
|
except Exception:
|
|
return False
|
|
|
|
def _run_mt(self, command: str):
|
|
try:
|
|
cmd_parts = command.split()
|
|
full_cmd = ["mt", "-f", self.device_path] + cmd_parts
|
|
self._log_command(full_cmd)
|
|
subprocess.run(full_cmd, check=True, capture_output=True)
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"Tape command 'mt {command}' failed: {e.stderr.decode()}")
|
|
raise
|
|
|
|
def _setup_compression(self):
|
|
"""Configures hardware compression on the drive using mt"""
|
|
if not os.path.exists(self.device_path):
|
|
return
|
|
|
|
try:
|
|
mode = "compression 1" if self.compression else "compression 0"
|
|
self._run_mt(mode)
|
|
logger.info(f"LTO Hardware Compression set to: {self.compression}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to set hardware compression: {e}")
|
|
|
|
def _setup_encryption(self):
|
|
"""Configures hardware encryption on the drive using stenc"""
|
|
if not os.path.exists(self.device_path):
|
|
return
|
|
|
|
if not self.encryption_key:
|
|
try:
|
|
cmd = ["stenc", "-f", self.device_path, "--off"]
|
|
self._log_command(cmd)
|
|
subprocess.run(cmd, capture_output=True)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
try:
|
|
logger.info(f"Setting LTO hardware encryption key for {self.device_path}")
|
|
# stenc expects a 32-byte hex key (256-bit)
|
|
# We use a pipe to avoid leaving the key in the process list
|
|
cmd_import = ["stenc", "-f", self.device_path, "--import", "-k", "-"]
|
|
self._log_command(cmd_import)
|
|
proc = subprocess.Popen(
|
|
cmd_import,
|
|
stdin=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
_, stderr = proc.communicate(input=self.encryption_key)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"LTO Encryption Setup Failed: {stderr}")
|
|
|
|
cmd_on = ["stenc", "-f", self.device_path, "--on"]
|
|
self._log_command(cmd_on)
|
|
subprocess.run(cmd_on, check=True, capture_output=True)
|
|
logger.info("LTO Hardware Encryption ENABLED and LOCKED")
|
|
except Exception as e:
|
|
logger.error(f"Hardware encryption error: {e}")
|
|
raise
|
|
|
|
def identify_media(self, allow_intrusive=True) -> Optional[str]:
|
|
"""Identifies the tape, prioritizing non-intrusive LKG MAM identity."""
|
|
state = self.get_live_info()
|
|
if not state["online"]:
|
|
return None
|
|
if state["identity"]:
|
|
return state["identity"]
|
|
|
|
if not allow_intrusive:
|
|
return None
|
|
|
|
try:
|
|
self._setup_encryption()
|
|
self._setup_compression()
|
|
self._run_mt("rewind")
|
|
cmd = ["tar", "-xf", self.device_path, "-O", ".tapehoard_label"]
|
|
self._log_command(cmd)
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=20)
|
|
if result.returncode == 0:
|
|
label_id = result.stdout.strip()
|
|
# Update LKG with the new barcode so we don't rewind again
|
|
if "barcode" not in LTOProvider._lkg_state[self.device_path]["mam"]:
|
|
LTOProvider._lkg_state[self.device_path]["mam"]["barcode"] = (
|
|
label_id
|
|
)
|
|
return label_id
|
|
except Exception as e:
|
|
logger.debug(f"Physical identification failed for {self.device_path}: {e}")
|
|
|
|
return None
|
|
|
|
def initialize_media(self, media_id: str) -> bool:
|
|
"""Writes the identifier to File Mark 0 and MAM 0x0806"""
|
|
try:
|
|
if self.is_write_protected():
|
|
raise PermissionError("Tape is write-protected.")
|
|
|
|
self._run_mt("rewind")
|
|
self._run_mt("weof")
|
|
self._run_mt("rewind")
|
|
|
|
import tempfile
|
|
import tarfile
|
|
|
|
with tempfile.NamedTemporaryFile("w") as tmp_lbl:
|
|
tmp_lbl.write(media_id)
|
|
tmp_lbl.flush()
|
|
with tempfile.NamedTemporaryFile("wb") as tmp_tar:
|
|
with tarfile.open(tmp_tar.name, "w") as tar:
|
|
tar.add(tmp_lbl.name, arcname=".tapehoard_label")
|
|
with open(tmp_tar.name, "rb") as f:
|
|
cmd_dd = ["dd", f"of={self.device_path}", "bs=256k"]
|
|
self._log_command(cmd_dd)
|
|
proc = subprocess.Popen(
|
|
cmd_dd,
|
|
stdin=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
if proc.stdin:
|
|
proc.stdin.write(f.read())
|
|
proc.stdin.close()
|
|
_, stderr = proc.communicate()
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"dd failed: {stderr.decode()}")
|
|
|
|
self._run_mt("weof")
|
|
self._run_mt("rewind")
|
|
|
|
# Update MAM 0x0806 (Barcode)
|
|
cmd_mam = ["sg_write_attr", "-w", f"0x0806={media_id}", self.device_path]
|
|
self._log_command(cmd_mam)
|
|
subprocess.run(cmd_mam, capture_output=True)
|
|
|
|
# Clear LKG so it re-polls fresh next time
|
|
LTOProvider._lkg_state[self.device_path]["mam"] = {}
|
|
logger.info(f"Initialized LTO tape with label {media_id}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize tape: {e}")
|
|
return False
|
|
|
|
def prepare_for_write(self, media_id: str) -> bool:
|
|
"""Fast-forwards to the end of the data to prepare for appending"""
|
|
current_id = self.identify_media()
|
|
if current_id != media_id:
|
|
logger.error(f"Tape mismatch. Expected {media_id}, found {current_id}")
|
|
return False
|
|
|
|
# Ensure encryption key is loaded before appending
|
|
self._setup_encryption()
|
|
self._setup_compression()
|
|
self._run_mt("eod")
|
|
return True
|
|
|
|
def _get_current_file_number(self) -> str:
|
|
"""Parses 'mt status' to get the current tape file position"""
|
|
try:
|
|
cmd = ["mt", "-f", self.device_path, "status"]
|
|
self._log_command(cmd)
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
match = re.search(r"(?:File number=|file number )(\d+)", result.stdout)
|
|
if match:
|
|
return match.group(1)
|
|
except Exception:
|
|
pass
|
|
return "0"
|
|
|
|
def write_archive(self, media_id: str, stream: BinaryIO) -> str:
|
|
"""Writes the stream to tape and returns the file number index"""
|
|
file_num = self._get_current_file_number()
|
|
cmd_dd = ["dd", f"of={self.device_path}", "bs=256k"]
|
|
self._log_command(cmd_dd)
|
|
proc = subprocess.Popen(cmd_dd, stdin=subprocess.PIPE)
|
|
if proc.stdin:
|
|
while True:
|
|
chunk = stream.read(1024 * 1024)
|
|
if not chunk:
|
|
break
|
|
proc.stdin.write(chunk)
|
|
proc.stdin.close()
|
|
proc.wait()
|
|
return file_num
|
|
|
|
def get_utilization(self) -> Optional[float]:
|
|
"""Calculates actual hardware utilization from MAM capacity attributes."""
|
|
# Force a fresh MAM read to get the most accurate current state after a write
|
|
mam = self.get_mam_info(force=True)
|
|
if "max_capacity_mib" in mam and "remaining_capacity_mib" in mam:
|
|
max_cap = mam["max_capacity_mib"]
|
|
rem_cap = mam["remaining_capacity_mib"]
|
|
if max_cap > 0:
|
|
return (max_cap - rem_cap) / max_cap
|
|
return None
|
|
|
|
def finalize_media(self, media_id: str):
|
|
self._run_mt("offline")
|
|
|
|
def read_archive(self, media_id: str, location_id: str) -> BinaryIO:
|
|
# Ensure encryption key is loaded before reading
|
|
self._setup_encryption()
|
|
self._setup_compression()
|
|
self._run_mt("rewind")
|
|
try:
|
|
loc_int = int(location_id)
|
|
if loc_int > 0:
|
|
self._run_mt(f"fsf {loc_int}")
|
|
except ValueError:
|
|
pass
|
|
cmd_dd = ["dd", f"if={self.device_path}", "bs=256k"]
|
|
self._log_command(cmd_dd)
|
|
proc = subprocess.Popen(cmd_dd, stdout=subprocess.PIPE)
|
|
if proc.stdout is None:
|
|
raise RuntimeError("Failed to open dd pipe")
|
|
return cast(BinaryIO, proc.stdout)
|