mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-06-22 03:20:45 -04:00
612 lines
22 KiB
Python
612 lines
22 KiB
Python
__package__ = "archivebox.plugins"
|
|
|
|
import json
|
|
import re
|
|
from collections.abc import Iterable, Mapping
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from django import forms
|
|
from django.utils.html import format_html
|
|
|
|
from archivebox.config import CONSTANTS_CONFIG
|
|
from archivebox.config.common import ArchiveBoxConfig, get_config
|
|
from archivebox.plugins.discovery import discover_plugin_configs, get_plugin_icon, get_plugins
|
|
|
|
|
|
PLUGIN_CONFIG_FIELD_PREFIX = "plugin_config__"
|
|
PLUGIN_GROUP_DEFINITIONS = (
|
|
(
|
|
"main_plugins",
|
|
"Main",
|
|
"",
|
|
"",
|
|
"",
|
|
(
|
|
"dom",
|
|
"screenshot",
|
|
"pdf",
|
|
"singlefile",
|
|
"wget",
|
|
"archivedotorg",
|
|
"chrome_mhtml",
|
|
"archivewebpage",
|
|
),
|
|
),
|
|
(
|
|
"page_setup_plugins",
|
|
"Page Setup",
|
|
"",
|
|
"",
|
|
"",
|
|
(
|
|
"chrome",
|
|
"infiniscroll",
|
|
"modalcloser",
|
|
"ublock",
|
|
"istilldontcareaboutcookies",
|
|
"twocaptcha",
|
|
"claudechrome",
|
|
),
|
|
),
|
|
(
|
|
"media_plugins",
|
|
"Media",
|
|
"",
|
|
"",
|
|
"",
|
|
(
|
|
"staticfile",
|
|
"responses",
|
|
"chrome_screencast",
|
|
"ytdlp",
|
|
"gallerydl",
|
|
"git",
|
|
),
|
|
),
|
|
(
|
|
"text_plugins",
|
|
"Text",
|
|
"",
|
|
"",
|
|
"",
|
|
(
|
|
"readability",
|
|
"htmltotext",
|
|
"defuddle",
|
|
"forumdl",
|
|
"mercury",
|
|
"trafilatura",
|
|
"liteparse",
|
|
"opendataloader",
|
|
"papersdl",
|
|
),
|
|
),
|
|
(
|
|
"metadata_plugins",
|
|
"Metadata",
|
|
"",
|
|
"",
|
|
"",
|
|
(
|
|
"title",
|
|
"favicon",
|
|
"headers",
|
|
"redirects",
|
|
"accessibility",
|
|
"consolelog",
|
|
"sslcerts",
|
|
"dns",
|
|
"seo",
|
|
"hashes",
|
|
),
|
|
),
|
|
(
|
|
"postprocessing_plugins",
|
|
"Postprocessing",
|
|
"",
|
|
"",
|
|
"",
|
|
(
|
|
"parse_dom_outlinks",
|
|
"parse_html_urls",
|
|
"parse_jsonl_urls",
|
|
"parse_netscape_urls",
|
|
"parse_rss_urls",
|
|
"parse_txt_urls",
|
|
"claudecode",
|
|
"claudecodecleanup",
|
|
"claudecodeextract",
|
|
),
|
|
),
|
|
)
|
|
HIDDEN_PLUGIN_CONFIG_UI_PLUGINS = {
|
|
"apt",
|
|
"base",
|
|
"bash",
|
|
"brew",
|
|
"cargo",
|
|
"chromewebstore",
|
|
"env",
|
|
"media",
|
|
"npm",
|
|
"opencode",
|
|
"pip",
|
|
"puppeteer",
|
|
"search_backend_ripgrep",
|
|
"search_backend_sonic",
|
|
"search_backend_sqlite",
|
|
"ssl",
|
|
}
|
|
TIMEOUT_INPUT_PATTERN = r"(0|[1-9][0-9]*|[0-9]+(?:\.[0-9]+)?\s*(?:s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hrs|hour|hours))"
|
|
|
|
|
|
def get_plugin_choices():
|
|
"""Get available extractor plugins from discovered hooks."""
|
|
return [(name, name) for name in get_plugins()]
|
|
|
|
|
|
def get_plugin_choice_label(plugin_name: str, plugin_configs: dict[str, dict]) -> str:
|
|
schema = plugin_configs.get(plugin_name, {})
|
|
description = str(schema.get("description") or "").strip()
|
|
if not description:
|
|
return plugin_name
|
|
icon_html = get_plugin_icon(plugin_name)
|
|
|
|
return format_html(
|
|
'<span class="plugin-choice-icon">{}</span><span class="plugin-choice-name">{}</span>',
|
|
icon_html,
|
|
plugin_name,
|
|
)
|
|
|
|
|
|
def get_choice_field(form: forms.Form, name: str) -> forms.ChoiceField:
|
|
field = form.fields[name]
|
|
if not isinstance(field, forms.ChoiceField):
|
|
raise TypeError(f"{name} must be a ChoiceField")
|
|
return field
|
|
|
|
|
|
def _plugin_config_input_name(plugin_name: str, config_key: str) -> str:
|
|
return f"{PLUGIN_CONFIG_FIELD_PREFIX}{plugin_name}__{config_key}"
|
|
|
|
|
|
def _schema_types(schema: Mapping[str, Any]) -> list[str]:
|
|
raw_type = schema.get("type") or "string"
|
|
if isinstance(raw_type, list):
|
|
return [str(item) for item in raw_type]
|
|
return [str(raw_type)]
|
|
|
|
|
|
def _jsonish(value: Any) -> str:
|
|
if isinstance(value, str):
|
|
return value
|
|
return json.dumps(value, sort_keys=True, default=str)
|
|
|
|
|
|
def _same_config_value(left: Any, right: Any) -> bool:
|
|
return json.dumps(left, sort_keys=True, default=str) == json.dumps(right, sort_keys=True, default=str)
|
|
|
|
|
|
def _coerce_plugin_config_value(raw_value: Any, schema: Mapping[str, Any]) -> Any:
|
|
schema_types = _schema_types(schema)
|
|
|
|
if "boolean" in schema_types:
|
|
if isinstance(raw_value, bool):
|
|
return raw_value
|
|
value = str(raw_value).strip().lower()
|
|
if value in {"true", "1", "yes", "on"}:
|
|
return True
|
|
if value in {"false", "0", "no", "off", ""}:
|
|
return False
|
|
raise forms.ValidationError("Must be true or false.")
|
|
|
|
if "integer" in schema_types:
|
|
value = int(str(raw_value).strip())
|
|
minimum = schema.get("minimum")
|
|
maximum = schema.get("maximum")
|
|
if minimum is not None and value < int(minimum):
|
|
raise forms.ValidationError(f"Must be at least {minimum}.")
|
|
if maximum is not None and value > int(maximum):
|
|
raise forms.ValidationError(f"Must be at most {maximum}.")
|
|
return value
|
|
|
|
if "number" in schema_types:
|
|
value = float(str(raw_value).strip())
|
|
minimum = schema.get("minimum")
|
|
maximum = schema.get("maximum")
|
|
if minimum is not None and value < float(minimum):
|
|
raise forms.ValidationError(f"Must be at least {minimum}.")
|
|
if maximum is not None and value > float(maximum):
|
|
raise forms.ValidationError(f"Must be at most {maximum}.")
|
|
return value
|
|
|
|
if "array" in schema_types:
|
|
if isinstance(raw_value, list):
|
|
return raw_value
|
|
value = str(raw_value).strip()
|
|
if not value:
|
|
return []
|
|
if value.startswith("["):
|
|
parsed = json.loads(value)
|
|
if not isinstance(parsed, list):
|
|
raise forms.ValidationError("Must be a JSON array.")
|
|
return parsed
|
|
return [item.strip() for item in value.replace(",", "\n").splitlines() if item.strip()]
|
|
|
|
if "object" in schema_types:
|
|
value = str(raw_value).strip()
|
|
if not value:
|
|
return {}
|
|
parsed = json.loads(value)
|
|
if not isinstance(parsed, dict):
|
|
raise forms.ValidationError("Must be a JSON object.")
|
|
return parsed
|
|
|
|
value = str(raw_value)
|
|
enum = schema.get("enum")
|
|
if isinstance(enum, list) and enum and value not in {str(item) for item in enum}:
|
|
raise forms.ValidationError(f"Must be one of: {', '.join(str(item) for item in enum)}.")
|
|
return value
|
|
|
|
|
|
class PluginConfigFormMixin:
|
|
plugin_groups: list[dict[str, Any]]
|
|
allow_crawl_execution_config_fields = True
|
|
|
|
def build_plugin_groups(self, runtime_config: Mapping[str, Any] | None = None) -> None:
|
|
all_plugins = get_plugins()
|
|
plugin_configs = discover_plugin_configs()
|
|
runtime_config = runtime_config or get_config()
|
|
self.plugin_config_binary_urls = get_plugin_config_binary_urls(runtime_config)
|
|
grouped_plugins = set().union(*(group[-1] for group in PLUGIN_GROUP_DEFINITIONS))
|
|
other_plugins = tuple(sorted(set(all_plugins) - grouped_plugins - HIDDEN_PLUGIN_CONFIG_UI_PLUGINS))
|
|
|
|
for field_name, *_rest, plugin_names in PLUGIN_GROUP_DEFINITIONS:
|
|
if field_name in self.fields:
|
|
get_choice_field(self, field_name).choices = [
|
|
(p, get_plugin_choice_label(p, plugin_configs)) for p in plugin_names if p in all_plugins
|
|
]
|
|
|
|
if "other_plugins" in self.fields:
|
|
get_choice_field(self, "other_plugins").choices = [(p, get_plugin_choice_label(p, plugin_configs)) for p in other_plugins]
|
|
|
|
group_specs = (
|
|
*PLUGIN_GROUP_DEFINITIONS,
|
|
("other_plugins", "Other", "", "", "", other_plugins),
|
|
)
|
|
binary_url_lookup = _build_required_binary_url_lookup(plugin_configs, runtime_config)
|
|
self.plugin_groups = [
|
|
{
|
|
"field_name": field_name,
|
|
"title": title,
|
|
"note": note,
|
|
"dom_id": dom_id,
|
|
"select_all_group": select_all_group,
|
|
"show_selectors": field_name in self.fields,
|
|
"plugins": self._build_plugin_cards(field_name, plugin_names, plugin_configs, runtime_config, binary_url_lookup),
|
|
}
|
|
for field_name, title, note, dom_id, select_all_group, plugin_names in group_specs
|
|
if any(plugin in all_plugins for plugin in plugin_names)
|
|
]
|
|
|
|
def _build_plugin_cards(
|
|
self,
|
|
field_name: str,
|
|
plugin_names: Iterable[str],
|
|
plugin_configs: dict[str, dict[str, Any]],
|
|
runtime_config: Mapping[str, Any],
|
|
binary_url_lookup: Mapping[str, str] | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
if field_name in self.fields:
|
|
choices = list(get_choice_field(self, field_name).choices)
|
|
selected_values = set(self.data.getlist(field_name)) if self.is_bound else set(get_choice_field(self, field_name).initial or [])
|
|
else:
|
|
all_plugins = get_plugins()
|
|
choices = [(p, get_plugin_choice_label(p, plugin_configs)) for p in plugin_names if p in all_plugins]
|
|
selected_values = set()
|
|
|
|
cards = []
|
|
for index, (plugin_name, label) in enumerate(choices):
|
|
schema = plugin_configs.get(str(plugin_name), {})
|
|
properties = schema.get("properties") or {}
|
|
enabled_config_key = f"{str(plugin_name).upper()}_ENABLED"
|
|
enabled_prop_schema = properties.get(enabled_config_key)
|
|
if not isinstance(enabled_prop_schema, dict) or "boolean" not in _schema_types(enabled_prop_schema):
|
|
enabled_config_key = ""
|
|
config_fields = [
|
|
self._build_plugin_config_field(str(plugin_name), str(config_key), prop_schema, runtime_config)
|
|
for config_key, prop_schema in properties.items()
|
|
if (
|
|
isinstance(prop_schema, dict)
|
|
and str(config_key) not in CONSTANTS_CONFIG
|
|
and (self.allow_crawl_execution_config_fields or ArchiveBoxConfig.scope_for_key(str(config_key)) == "crawl_frozen")
|
|
)
|
|
]
|
|
cards.append(
|
|
{
|
|
"name": str(plugin_name),
|
|
"label": label,
|
|
"checked": str(plugin_name) in selected_values,
|
|
"checkbox_id": f"id_{field_name}_{index}",
|
|
"enabled_config_key": enabled_config_key,
|
|
"description": str(schema.get("description") or "").strip(),
|
|
"source_url": f"https://github.com/ArchiveBox/abx-plugins/tree/main/abx_plugins/plugins/{plugin_name}",
|
|
"docs_url": f"https://archivebox.github.io/abx-plugins/#{plugin_name}",
|
|
"required_plugins": [str(item) for item in schema.get("required_plugins") or []],
|
|
"required_binary_links": _build_required_binary_links(
|
|
schema.get("required_binaries") or [],
|
|
runtime_config,
|
|
binary_url_lookup,
|
|
),
|
|
"config_fields": config_fields,
|
|
"config_count": len(config_fields),
|
|
},
|
|
)
|
|
return cards
|
|
|
|
def _build_plugin_config_field(
|
|
self,
|
|
plugin_name: str,
|
|
config_key: str,
|
|
prop_schema: Mapping[str, Any],
|
|
runtime_config: Mapping[str, Any],
|
|
) -> dict[str, Any]:
|
|
schema_types = _schema_types(prop_schema)
|
|
enum = prop_schema.get("enum")
|
|
input_name = _plugin_config_input_name(plugin_name, config_key)
|
|
current_value = runtime_config.get(config_key, prop_schema.get("default", ""))
|
|
if self.is_bound and input_name in self.data:
|
|
try:
|
|
current_value = _coerce_plugin_config_value(self.data.get(input_name), prop_schema)
|
|
except (TypeError, ValueError, json.JSONDecodeError, forms.ValidationError):
|
|
current_value = self.data.get(input_name)
|
|
|
|
default_value = prop_schema.get("default", "")
|
|
fallback_key = prop_schema.get("x-fallback")
|
|
default_display = f"{{{fallback_key}}}" if fallback_key else default_value
|
|
from archivebox.config.common import is_sensitive_config_key
|
|
|
|
is_sensitive = bool(prop_schema.get("x-sensitive")) or is_sensitive_config_key(config_key)
|
|
input_value = "" if is_sensitive else _jsonish(current_value)
|
|
field_kind = "text"
|
|
input_type = "text"
|
|
options = []
|
|
|
|
if "boolean" in schema_types:
|
|
field_kind = "boolean"
|
|
input_value = "true" if bool(current_value) else "false"
|
|
elif isinstance(enum, list) and enum:
|
|
field_kind = "select"
|
|
options = [
|
|
{
|
|
"value": str(option),
|
|
"label": str(option),
|
|
"selected": str(option) == str(current_value),
|
|
}
|
|
for option in enum
|
|
]
|
|
elif "integer" in schema_types or "number" in schema_types:
|
|
field_kind = "number"
|
|
input_type = "number"
|
|
elif "array" in schema_types or "object" in schema_types:
|
|
field_kind = "json"
|
|
input_value = "" if is_sensitive else json.dumps(current_value, indent=2, sort_keys=True, default=str)
|
|
elif is_sensitive:
|
|
input_type = "password"
|
|
else:
|
|
input_value = "" if is_sensitive else str(current_value)
|
|
|
|
return {
|
|
"key": config_key,
|
|
"input_name": input_name,
|
|
"kind": field_kind,
|
|
"input_type": input_type,
|
|
"value": input_value,
|
|
"checked": bool(current_value),
|
|
"options": options,
|
|
"description": str(prop_schema.get("description") or "").strip(),
|
|
"default": _jsonish(default_display),
|
|
"current": "configured"
|
|
if is_sensitive and current_value
|
|
else (str(current_value) if "string" in schema_types else _jsonish(current_value)),
|
|
"current_url": self.plugin_config_binary_urls.get(config_key, "") if str(config_key).endswith("_BINARY") else "",
|
|
"is_sensitive": is_sensitive,
|
|
"minimum": prop_schema.get("minimum"),
|
|
"maximum": prop_schema.get("maximum"),
|
|
"pattern": prop_schema.get("pattern"),
|
|
"type_label": " / ".join(schema_types),
|
|
}
|
|
|
|
def clean_plugin_config_overrides(self, effective_config: Mapping[str, Any] | None = None) -> dict[str, Any]:
|
|
if not self.is_bound:
|
|
return {}
|
|
|
|
effective_config = effective_config or get_config()
|
|
overrides: dict[str, Any] = {}
|
|
sources: dict[str, str] = {}
|
|
|
|
for plugin_name, schema in discover_plugin_configs().items():
|
|
for config_key, prop_schema in (schema.get("properties") or {}).items():
|
|
if not isinstance(prop_schema, dict):
|
|
continue
|
|
|
|
input_name = _plugin_config_input_name(plugin_name, config_key)
|
|
if input_name not in self.data:
|
|
continue
|
|
if str(config_key) in CONSTANTS_CONFIG:
|
|
continue
|
|
if not self.allow_crawl_execution_config_fields and ArchiveBoxConfig.scope_for_key(str(config_key)) != "crawl_frozen":
|
|
continue
|
|
|
|
raw_value: Any = self.data.get(input_name)
|
|
if "array" in _schema_types(prop_schema) and isinstance(prop_schema.get("enum"), list):
|
|
raw_value = self.data.getlist(input_name)
|
|
|
|
from archivebox.config.common import SENSITIVE_CONFIG_VALUE_REDACTED, is_sensitive_config_key
|
|
|
|
if (prop_schema.get("x-sensitive") or is_sensitive_config_key(config_key)) and raw_value in (
|
|
"",
|
|
SENSITIVE_CONFIG_VALUE_REDACTED,
|
|
):
|
|
continue
|
|
|
|
try:
|
|
coerced_value = _coerce_plugin_config_value(raw_value, prop_schema)
|
|
except (TypeError, ValueError, json.JSONDecodeError) as err:
|
|
self.add_error("config", forms.ValidationError(f"{config_key}: {err}"))
|
|
continue
|
|
except forms.ValidationError as err:
|
|
self.add_error("config", forms.ValidationError(f"{config_key}: {err.messages[0]}"))
|
|
continue
|
|
|
|
base_value = effective_config.get(config_key, prop_schema.get("default", ""))
|
|
if _same_config_value(coerced_value, base_value):
|
|
continue
|
|
|
|
existing_value = overrides.get(config_key)
|
|
if config_key in overrides and not _same_config_value(existing_value, coerced_value):
|
|
self.add_error(
|
|
"config",
|
|
forms.ValidationError(
|
|
f"{config_key} was set differently under {sources[config_key]} and {plugin_name}. Set it once in Custom config overrides.",
|
|
),
|
|
)
|
|
continue
|
|
|
|
overrides[config_key] = coerced_value
|
|
sources[config_key] = plugin_name
|
|
|
|
return overrides
|
|
|
|
def plugin_config_keys(self) -> set[str]:
|
|
return {
|
|
str(config_key)
|
|
for schema in discover_plugin_configs().values()
|
|
for config_key, prop_schema in (schema.get("properties") or {}).items()
|
|
if isinstance(prop_schema, dict)
|
|
}
|
|
|
|
|
|
_BINARY_TEMPLATE_PATTERN = re.compile(r"\{([A-Z_][A-Z0-9_]*)\}")
|
|
|
|
|
|
def _resolve_required_binary_name(template_name: str, runtime_config: Mapping[str, Any]) -> str:
|
|
if "{" not in template_name:
|
|
return template_name
|
|
|
|
def _replace(match: re.Match[str]) -> str:
|
|
key = match.group(1)
|
|
try:
|
|
value = runtime_config.get(key)
|
|
except Exception:
|
|
value = None
|
|
if value is None or value == "":
|
|
return match.group(0)
|
|
return str(value)
|
|
|
|
resolved = _BINARY_TEMPLATE_PATTERN.sub(_replace, template_name).strip()
|
|
if not resolved:
|
|
return template_name
|
|
return Path(resolved).name if "/" in resolved else resolved
|
|
|
|
|
|
def _iter_required_binary_names(
|
|
required_binaries: Iterable[Any],
|
|
runtime_config: Mapping[str, Any],
|
|
) -> Iterable[str]:
|
|
for item in required_binaries or []:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
raw_name = str(item.get("name") or "").strip()
|
|
if not raw_name:
|
|
continue
|
|
resolved = _resolve_required_binary_name(raw_name, runtime_config)
|
|
if resolved:
|
|
yield resolved
|
|
|
|
|
|
def _build_required_binary_url_lookup(
|
|
plugin_configs: Mapping[str, dict[str, Any]],
|
|
runtime_config: Mapping[str, Any],
|
|
) -> dict[str, str]:
|
|
"""Resolve admin URLs for every required binary across all plugin schemas in a single DB query."""
|
|
from archivebox.config.views import get_environment_binary_url, get_installed_binary_change_url
|
|
from archivebox.machine.models import Binary, Machine
|
|
|
|
resolved_names: set[str] = set()
|
|
for schema in plugin_configs.values():
|
|
for name in _iter_required_binary_names(schema.get("required_binaries") or [], runtime_config):
|
|
resolved_names.add(name)
|
|
|
|
if not resolved_names:
|
|
return {}
|
|
|
|
machine = Machine.current()
|
|
name_to_binary: dict[str, Binary] = {}
|
|
for binary in (
|
|
Binary.objects.filter(machine=machine, name__in=resolved_names)
|
|
.exclude(abspath="")
|
|
.exclude(abspath__isnull=True)
|
|
.order_by("-modified_at")
|
|
):
|
|
key = binary.name.lower()
|
|
if key not in name_to_binary:
|
|
name_to_binary[key] = binary
|
|
|
|
return {
|
|
name: (get_installed_binary_change_url(name, name_to_binary.get(name.lower())) or get_environment_binary_url(name))
|
|
for name in resolved_names
|
|
}
|
|
|
|
|
|
def _build_required_binary_links(
|
|
required_binaries: list[dict[str, Any]],
|
|
runtime_config: Mapping[str, Any],
|
|
binary_url_lookup: Mapping[str, str] | None = None,
|
|
) -> list[dict[str, str]]:
|
|
from archivebox.config.views import get_environment_binary_url
|
|
|
|
links: list[dict[str, str]] = []
|
|
seen: set[str] = set()
|
|
for resolved in _iter_required_binary_names(required_binaries, runtime_config):
|
|
if resolved in seen:
|
|
continue
|
|
seen.add(resolved)
|
|
url = (binary_url_lookup or {}).get(resolved) or get_environment_binary_url(resolved)
|
|
links.append({"name": resolved, "url": url})
|
|
return links
|
|
|
|
|
|
def get_plugin_config_binary_urls(runtime_config: Mapping[str, Any]) -> dict[str, str]:
|
|
from archivebox.config.views import get_environment_binary_url, get_installed_binary_change_url
|
|
from archivebox.machine.models import Binary, Machine
|
|
|
|
binary_keys = {
|
|
str(config_key)
|
|
for schema in discover_plugin_configs().values()
|
|
for config_key, prop_schema in (schema.get("properties") or {}).items()
|
|
if isinstance(prop_schema, dict) and str(config_key).endswith("_BINARY")
|
|
}
|
|
urls: dict[str, str] = {}
|
|
machine = Machine.current()
|
|
for key in binary_keys:
|
|
value = str(runtime_config.get(key) or "").strip()
|
|
if not value:
|
|
continue
|
|
name = Path(value).name if "/" in value else value
|
|
binary = Binary.objects.get_valid_binary(value, machine=machine)
|
|
if binary is None and "/" in value:
|
|
binary = (
|
|
Binary.objects.exclude(abspath="")
|
|
.exclude(abspath__isnull=True)
|
|
.filter(machine=machine, abspath=value)
|
|
.order_by("-modified_at")
|
|
.first()
|
|
)
|
|
if binary is None and name != value:
|
|
binary = Binary.objects.get_valid_binary(name, machine=machine)
|
|
binary_name = binary.name if binary is not None else name
|
|
urls[key] = get_installed_binary_change_url(binary_name, binary) or get_environment_binary_url(name)
|
|
return urls
|