__package__ = "archivebox.plugins" import json import re from collections.abc import Iterable, Mapping from pathlib import Path from typing import Any from django import forms from django.utils.html import format_html from archivebox.config import CONSTANTS_CONFIG from archivebox.config.common import ArchiveBoxConfig, get_config from archivebox.plugins.discovery import discover_plugin_configs, get_plugin_icon, get_plugins PLUGIN_CONFIG_FIELD_PREFIX = "plugin_config__" PLUGIN_GROUP_DEFINITIONS = ( ( "main_plugins", "Main", "", "", "", ( "dom", "screenshot", "pdf", "singlefile", "wget", "archivedotorg", "chrome_mhtml", "archivewebpage", ), ), ( "page_setup_plugins", "Page Setup", "", "", "", ( "chrome", "infiniscroll", "modalcloser", "ublock", "istilldontcareaboutcookies", "twocaptcha", "claudechrome", ), ), ( "media_plugins", "Media", "", "", "", ( "staticfile", "responses", "chrome_screencast", "ytdlp", "gallerydl", "git", ), ), ( "text_plugins", "Text", "", "", "", ( "readability", "htmltotext", "defuddle", "forumdl", "mercury", "trafilatura", "liteparse", "opendataloader", "papersdl", ), ), ( "metadata_plugins", "Metadata", "", "", "", ( "title", "favicon", "headers", "redirects", "accessibility", "consolelog", "sslcerts", "dns", "seo", "hashes", ), ), ( "postprocessing_plugins", "Postprocessing", "", "", "", ( "parse_dom_outlinks", "parse_html_urls", "parse_jsonl_urls", "parse_netscape_urls", "parse_rss_urls", "parse_txt_urls", "claudecode", "claudecodecleanup", "claudecodeextract", ), ), ) HIDDEN_PLUGIN_CONFIG_UI_PLUGINS = { "apt", "base", "bash", "brew", "cargo", "chromewebstore", "env", "media", "npm", "opencode", "pip", "puppeteer", "search_backend_ripgrep", "search_backend_sonic", "search_backend_sqlite", "ssl", } TIMEOUT_INPUT_PATTERN = r"(0|[1-9][0-9]*|[0-9]+(?:\.[0-9]+)?\s*(?:s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hrs|hour|hours))" def get_plugin_choices(): """Get available extractor plugins from discovered hooks.""" return [(name, name) for name in get_plugins()] def get_plugin_choice_label(plugin_name: str, plugin_configs: dict[str, dict]) -> str: schema = plugin_configs.get(plugin_name, {}) description = str(schema.get("description") or "").strip() if not description: return plugin_name icon_html = get_plugin_icon(plugin_name) return format_html( '{}{}', icon_html, plugin_name, ) def get_choice_field(form: forms.Form, name: str) -> forms.ChoiceField: field = form.fields[name] if not isinstance(field, forms.ChoiceField): raise TypeError(f"{name} must be a ChoiceField") return field def _plugin_config_input_name(plugin_name: str, config_key: str) -> str: return f"{PLUGIN_CONFIG_FIELD_PREFIX}{plugin_name}__{config_key}" def _schema_types(schema: Mapping[str, Any]) -> list[str]: raw_type = schema.get("type") or "string" if isinstance(raw_type, list): return [str(item) for item in raw_type] return [str(raw_type)] def _jsonish(value: Any) -> str: if isinstance(value, str): return value return json.dumps(value, sort_keys=True, default=str) def _same_config_value(left: Any, right: Any) -> bool: return json.dumps(left, sort_keys=True, default=str) == json.dumps(right, sort_keys=True, default=str) def _coerce_plugin_config_value(raw_value: Any, schema: Mapping[str, Any]) -> Any: schema_types = _schema_types(schema) if "boolean" in schema_types: if isinstance(raw_value, bool): return raw_value value = str(raw_value).strip().lower() if value in {"true", "1", "yes", "on"}: return True if value in {"false", "0", "no", "off", ""}: return False raise forms.ValidationError("Must be true or false.") if "integer" in schema_types: value = int(str(raw_value).strip()) minimum = schema.get("minimum") maximum = schema.get("maximum") if minimum is not None and value < int(minimum): raise forms.ValidationError(f"Must be at least {minimum}.") if maximum is not None and value > int(maximum): raise forms.ValidationError(f"Must be at most {maximum}.") return value if "number" in schema_types: value = float(str(raw_value).strip()) minimum = schema.get("minimum") maximum = schema.get("maximum") if minimum is not None and value < float(minimum): raise forms.ValidationError(f"Must be at least {minimum}.") if maximum is not None and value > float(maximum): raise forms.ValidationError(f"Must be at most {maximum}.") return value if "array" in schema_types: if isinstance(raw_value, list): return raw_value value = str(raw_value).strip() if not value: return [] if value.startswith("["): parsed = json.loads(value) if not isinstance(parsed, list): raise forms.ValidationError("Must be a JSON array.") return parsed return [item.strip() for item in value.replace(",", "\n").splitlines() if item.strip()] if "object" in schema_types: value = str(raw_value).strip() if not value: return {} parsed = json.loads(value) if not isinstance(parsed, dict): raise forms.ValidationError("Must be a JSON object.") return parsed value = str(raw_value) enum = schema.get("enum") if isinstance(enum, list) and enum and value not in {str(item) for item in enum}: raise forms.ValidationError(f"Must be one of: {', '.join(str(item) for item in enum)}.") return value class PluginConfigFormMixin: plugin_groups: list[dict[str, Any]] allow_crawl_execution_config_fields = True def build_plugin_groups(self, runtime_config: Mapping[str, Any] | None = None) -> None: all_plugins = get_plugins() plugin_configs = discover_plugin_configs() runtime_config = runtime_config or get_config() self.plugin_config_binary_urls = get_plugin_config_binary_urls(runtime_config) grouped_plugins = set().union(*(group[-1] for group in PLUGIN_GROUP_DEFINITIONS)) other_plugins = tuple(sorted(set(all_plugins) - grouped_plugins - HIDDEN_PLUGIN_CONFIG_UI_PLUGINS)) for field_name, *_rest, plugin_names in PLUGIN_GROUP_DEFINITIONS: if field_name in self.fields: get_choice_field(self, field_name).choices = [ (p, get_plugin_choice_label(p, plugin_configs)) for p in plugin_names if p in all_plugins ] if "other_plugins" in self.fields: get_choice_field(self, "other_plugins").choices = [(p, get_plugin_choice_label(p, plugin_configs)) for p in other_plugins] group_specs = ( *PLUGIN_GROUP_DEFINITIONS, ("other_plugins", "Other", "", "", "", other_plugins), ) binary_url_lookup = _build_required_binary_url_lookup(plugin_configs, runtime_config) self.plugin_groups = [ { "field_name": field_name, "title": title, "note": note, "dom_id": dom_id, "select_all_group": select_all_group, "show_selectors": field_name in self.fields, "plugins": self._build_plugin_cards(field_name, plugin_names, plugin_configs, runtime_config, binary_url_lookup), } for field_name, title, note, dom_id, select_all_group, plugin_names in group_specs if any(plugin in all_plugins for plugin in plugin_names) ] def _build_plugin_cards( self, field_name: str, plugin_names: Iterable[str], plugin_configs: dict[str, dict[str, Any]], runtime_config: Mapping[str, Any], binary_url_lookup: Mapping[str, str] | None = None, ) -> list[dict[str, Any]]: if field_name in self.fields: choices = list(get_choice_field(self, field_name).choices) selected_values = set(self.data.getlist(field_name)) if self.is_bound else set(get_choice_field(self, field_name).initial or []) else: all_plugins = get_plugins() choices = [(p, get_plugin_choice_label(p, plugin_configs)) for p in plugin_names if p in all_plugins] selected_values = set() cards = [] for index, (plugin_name, label) in enumerate(choices): schema = plugin_configs.get(str(plugin_name), {}) properties = schema.get("properties") or {} enabled_config_key = f"{str(plugin_name).upper()}_ENABLED" enabled_prop_schema = properties.get(enabled_config_key) if not isinstance(enabled_prop_schema, dict) or "boolean" not in _schema_types(enabled_prop_schema): enabled_config_key = "" config_fields = [ self._build_plugin_config_field(str(plugin_name), str(config_key), prop_schema, runtime_config) for config_key, prop_schema in properties.items() if ( isinstance(prop_schema, dict) and str(config_key) not in CONSTANTS_CONFIG and (self.allow_crawl_execution_config_fields or ArchiveBoxConfig.scope_for_key(str(config_key)) == "crawl_frozen") ) ] cards.append( { "name": str(plugin_name), "label": label, "checked": str(plugin_name) in selected_values, "checkbox_id": f"id_{field_name}_{index}", "enabled_config_key": enabled_config_key, "description": str(schema.get("description") or "").strip(), "source_url": f"https://github.com/ArchiveBox/abx-plugins/tree/main/abx_plugins/plugins/{plugin_name}", "docs_url": f"https://archivebox.github.io/abx-plugins/#{plugin_name}", "required_plugins": [str(item) for item in schema.get("required_plugins") or []], "required_binary_links": _build_required_binary_links( schema.get("required_binaries") or [], runtime_config, binary_url_lookup, ), "config_fields": config_fields, "config_count": len(config_fields), }, ) return cards def _build_plugin_config_field( self, plugin_name: str, config_key: str, prop_schema: Mapping[str, Any], runtime_config: Mapping[str, Any], ) -> dict[str, Any]: schema_types = _schema_types(prop_schema) enum = prop_schema.get("enum") input_name = _plugin_config_input_name(plugin_name, config_key) current_value = runtime_config.get(config_key, prop_schema.get("default", "")) if self.is_bound and input_name in self.data: try: current_value = _coerce_plugin_config_value(self.data.get(input_name), prop_schema) except (TypeError, ValueError, json.JSONDecodeError, forms.ValidationError): current_value = self.data.get(input_name) default_value = prop_schema.get("default", "") fallback_key = prop_schema.get("x-fallback") default_display = f"{{{fallback_key}}}" if fallback_key else default_value from archivebox.config.common import is_sensitive_config_key is_sensitive = bool(prop_schema.get("x-sensitive")) or is_sensitive_config_key(config_key) input_value = "" if is_sensitive else _jsonish(current_value) field_kind = "text" input_type = "text" options = [] if "boolean" in schema_types: field_kind = "boolean" input_value = "true" if bool(current_value) else "false" elif isinstance(enum, list) and enum: field_kind = "select" options = [ { "value": str(option), "label": str(option), "selected": str(option) == str(current_value), } for option in enum ] elif "integer" in schema_types or "number" in schema_types: field_kind = "number" input_type = "number" elif "array" in schema_types or "object" in schema_types: field_kind = "json" input_value = "" if is_sensitive else json.dumps(current_value, indent=2, sort_keys=True, default=str) elif is_sensitive: input_type = "password" else: input_value = "" if is_sensitive else str(current_value) return { "key": config_key, "input_name": input_name, "kind": field_kind, "input_type": input_type, "value": input_value, "checked": bool(current_value), "options": options, "description": str(prop_schema.get("description") or "").strip(), "default": _jsonish(default_display), "current": "configured" if is_sensitive and current_value else (str(current_value) if "string" in schema_types else _jsonish(current_value)), "current_url": self.plugin_config_binary_urls.get(config_key, "") if str(config_key).endswith("_BINARY") else "", "is_sensitive": is_sensitive, "minimum": prop_schema.get("minimum"), "maximum": prop_schema.get("maximum"), "pattern": prop_schema.get("pattern"), "type_label": " / ".join(schema_types), } def clean_plugin_config_overrides(self, effective_config: Mapping[str, Any] | None = None) -> dict[str, Any]: if not self.is_bound: return {} effective_config = effective_config or get_config() overrides: dict[str, Any] = {} sources: dict[str, str] = {} for plugin_name, schema in discover_plugin_configs().items(): for config_key, prop_schema in (schema.get("properties") or {}).items(): if not isinstance(prop_schema, dict): continue input_name = _plugin_config_input_name(plugin_name, config_key) if input_name not in self.data: continue if str(config_key) in CONSTANTS_CONFIG: continue if not self.allow_crawl_execution_config_fields and ArchiveBoxConfig.scope_for_key(str(config_key)) != "crawl_frozen": continue raw_value: Any = self.data.get(input_name) if "array" in _schema_types(prop_schema) and isinstance(prop_schema.get("enum"), list): raw_value = self.data.getlist(input_name) from archivebox.config.common import SENSITIVE_CONFIG_VALUE_REDACTED, is_sensitive_config_key if (prop_schema.get("x-sensitive") or is_sensitive_config_key(config_key)) and raw_value in ( "", SENSITIVE_CONFIG_VALUE_REDACTED, ): continue try: coerced_value = _coerce_plugin_config_value(raw_value, prop_schema) except (TypeError, ValueError, json.JSONDecodeError) as err: self.add_error("config", forms.ValidationError(f"{config_key}: {err}")) continue except forms.ValidationError as err: self.add_error("config", forms.ValidationError(f"{config_key}: {err.messages[0]}")) continue base_value = effective_config.get(config_key, prop_schema.get("default", "")) if _same_config_value(coerced_value, base_value): continue existing_value = overrides.get(config_key) if config_key in overrides and not _same_config_value(existing_value, coerced_value): self.add_error( "config", forms.ValidationError( f"{config_key} was set differently under {sources[config_key]} and {plugin_name}. Set it once in Custom config overrides.", ), ) continue overrides[config_key] = coerced_value sources[config_key] = plugin_name return overrides def plugin_config_keys(self) -> set[str]: return { str(config_key) for schema in discover_plugin_configs().values() for config_key, prop_schema in (schema.get("properties") or {}).items() if isinstance(prop_schema, dict) } _BINARY_TEMPLATE_PATTERN = re.compile(r"\{([A-Z_][A-Z0-9_]*)\}") def _resolve_required_binary_name(template_name: str, runtime_config: Mapping[str, Any]) -> str: if "{" not in template_name: return template_name def _replace(match: re.Match[str]) -> str: key = match.group(1) try: value = runtime_config.get(key) except Exception: value = None if value is None or value == "": return match.group(0) return str(value) resolved = _BINARY_TEMPLATE_PATTERN.sub(_replace, template_name).strip() if not resolved: return template_name return Path(resolved).name if "/" in resolved else resolved def _iter_required_binary_names( required_binaries: Iterable[Any], runtime_config: Mapping[str, Any], ) -> Iterable[str]: for item in required_binaries or []: if not isinstance(item, dict): continue raw_name = str(item.get("name") or "").strip() if not raw_name: continue resolved = _resolve_required_binary_name(raw_name, runtime_config) if resolved: yield resolved def _build_required_binary_url_lookup( plugin_configs: Mapping[str, dict[str, Any]], runtime_config: Mapping[str, Any], ) -> dict[str, str]: """Resolve admin URLs for every required binary across all plugin schemas in a single DB query.""" from archivebox.config.views import get_environment_binary_url, get_installed_binary_change_url from archivebox.machine.models import Binary, Machine resolved_names: set[str] = set() for schema in plugin_configs.values(): for name in _iter_required_binary_names(schema.get("required_binaries") or [], runtime_config): resolved_names.add(name) if not resolved_names: return {} machine = Machine.current() name_to_binary: dict[str, Binary] = {} for binary in ( Binary.objects.filter(machine=machine, name__in=resolved_names) .exclude(abspath="") .exclude(abspath__isnull=True) .order_by("-modified_at") ): key = binary.name.lower() if key not in name_to_binary: name_to_binary[key] = binary return { name: (get_installed_binary_change_url(name, name_to_binary.get(name.lower())) or get_environment_binary_url(name)) for name in resolved_names } def _build_required_binary_links( required_binaries: list[dict[str, Any]], runtime_config: Mapping[str, Any], binary_url_lookup: Mapping[str, str] | None = None, ) -> list[dict[str, str]]: from archivebox.config.views import get_environment_binary_url links: list[dict[str, str]] = [] seen: set[str] = set() for resolved in _iter_required_binary_names(required_binaries, runtime_config): if resolved in seen: continue seen.add(resolved) url = (binary_url_lookup or {}).get(resolved) or get_environment_binary_url(resolved) links.append({"name": resolved, "url": url}) return links def get_plugin_config_binary_urls(runtime_config: Mapping[str, Any]) -> dict[str, str]: from archivebox.config.views import get_environment_binary_url, get_installed_binary_change_url from archivebox.machine.models import Binary, Machine binary_keys = { str(config_key) for schema in discover_plugin_configs().values() for config_key, prop_schema in (schema.get("properties") or {}).items() if isinstance(prop_schema, dict) and str(config_key).endswith("_BINARY") } urls: dict[str, str] = {} machine = Machine.current() for key in binary_keys: value = str(runtime_config.get(key) or "").strip() if not value: continue name = Path(value).name if "/" in value else value binary = Binary.objects.get_valid_binary(value, machine=machine) if binary is None and "/" in value: binary = ( Binary.objects.exclude(abspath="") .exclude(abspath__isnull=True) .filter(machine=machine, abspath=value) .order_by("-modified_at") .first() ) if binary is None and name != value: binary = Binary.objects.get_valid_binary(name, machine=machine) binary_name = binary.name if binary is not None else name urls[key] = get_installed_binary_change_url(binary_name, binary) or get_environment_binary_url(name) return urls