# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0
"""Plugin Configuration UI - web interface for viewing and editing plugin config.
Provides a Swagger-like interface to navigate the routing tree and
configure plugins at router-level (_all_) and entry-level.
Mountable as sys_app via config:
sys_apps:
plugin_config:
module: "genro_asgi.sys_applications.plugin_config.plugin_config_app:Application"
Endpoints:
/: Index page with plugin configuration UI.
/tree: JSON routing tree with plugin_info.
/plugins: Available plugins with configure() signatures.
/config: Read plugin configuration for a node.
/apply: Apply a config change at runtime (no persistence).
/save: Persist current runtime config to plugin_config.json.
/revert: Reload config from plugin_config.json, discarding runtime changes.
/logs: Read captured log entries.
/static: Static resource serving (JS, CSS).
"""
from __future__ import annotations
import inspect
import logging
import time
from collections import deque
from pathlib import Path
from typing import Any, ClassVar
from genro_routes import Router, route # type: ignore[import-untyped]
from genro_asgi import AsgiApplication
__all__ = ["Application"]
RESOURCES_DIR = Path(__file__).parent / "resources"
LOG_BUFFER_SIZE = 500
class LogEntry:
"""Single captured log record.
Attributes:
timestamp: When the log was recorded (epoch seconds).
entry_name: Parsed handler/entry name from the log message.
message: Full log message text.
level: Log level name (INFO, WARNING, etc.).
"""
__slots__ = ("timestamp", "entry_name", "message", "level")
def __init__(self, timestamp: float, entry_name: str, message: str, level: str) -> None:
"""Initialize log entry.
Args:
timestamp: When the log was recorded (epoch seconds).
entry_name: Parsed handler/entry name.
message: Full log message text.
level: Log level name.
"""
self.timestamp = timestamp
self.entry_name = entry_name
self.message = message
self.level = level
def as_dict(self) -> dict[str, Any]:
"""Serialize to JSON-safe dict."""
return {
"timestamp": self.timestamp,
"entry_name": self.entry_name,
"message": self.message,
"level": self.level,
}
class LogCaptureHandler(logging.Handler):
"""Logging handler that captures records into a ring buffer.
Attaches to the genro_routes logger to capture LoggingPlugin output.
Records are stored with parsed entry names for per-endpoint filtering.
"""
__slots__ = ("_buffer",)
def __init__(self, maxlen: int = LOG_BUFFER_SIZE) -> None:
"""Initialize handler with ring buffer.
Args:
maxlen: Maximum log entries to retain.
"""
super().__init__()
self._buffer: deque[LogEntry] = deque(maxlen=maxlen)
def emit(self, record: logging.LogRecord) -> None:
"""Capture a log record into the ring buffer."""
message = self.format(record)
entry_name = self._parse_entry_name(message)
self._buffer.append(LogEntry(
timestamp=record.created,
entry_name=entry_name,
message=message,
level=record.levelname,
))
def get_logs(self, entry: str = "", since: float = 0.0, limit: int = 100) -> list[dict[str, Any]]:
"""Return captured log entries, optionally filtered.
Args:
entry: Filter by entry name (empty = all entries).
since: Only return entries after this timestamp.
limit: Maximum number of entries to return.
"""
result: list[dict[str, Any]] = []
for log_entry in reversed(self._buffer):
if log_entry.timestamp <= since:
break
if entry and log_entry.entry_name != entry:
continue
result.append(log_entry.as_dict())
if len(result) >= limit:
break
result.reverse()
return result
def clear(self) -> None:
"""Clear the log buffer."""
self._buffer.clear()
def _parse_entry_name(self, message: str) -> str:
"""Extract entry name from LoggingPlugin message format.
Args:
message: Log message in format "{entry_name} start" or "{entry_name} end (X ms)".
Returns:
Entry name string, or empty string if not parseable.
"""
for suffix in (" start", " end "):
idx = message.find(suffix)
if idx > 0:
return message[:idx]
return ""
[docs]
class Application(AsgiApplication):
"""Plugin configuration web UI. Mountable as sys_app at /_sys/plugin_config/.
Persistence logic (load/save/apply plugin_config.json) lives on AsgiServer.
This app is a thin GUI layer that delegates to server methods.
"""
openapi_info: ClassVar[dict[str, Any]] = {
"title": "Plugin Configuration",
"version": "1.0.0",
"description": "Web UI for viewing and editing plugin configuration at runtime.",
}
[docs]
def on_init(self, **kwargs: Any) -> None:
"""Initialize log capture handler.
Args:
**kwargs: Parameters from config.yaml app definition.
"""
self._log_handler = LogCaptureHandler(maxlen=LOG_BUFFER_SIZE)
self._attach_log_handler()
def _attach_log_handler(self) -> None:
"""Attach the log capture handler to genro_routes logger."""
logger = logging.getLogger("genro_routes")
logger.addHandler(self._log_handler)
if logger.level > logging.INFO or logger.level == logging.NOTSET:
logger.setLevel(logging.INFO)
# -- Page and static resources --
[docs]
@route(meta_mime_type="text/html")
def index(self) -> str:
"""Serve the plugin configuration page."""
html_path = RESOURCES_DIR / "index.html"
return html_path.read_text()
[docs]
@route()
def static(self, file: str = "") -> Path:
"""Serve static resources from resources folder.
Args:
file: Filename to serve (e.g., "config_panel.js").
Returns:
Path to the resource file.
Raises:
ValueError: If file parameter is empty.
FileNotFoundError: If resource file does not exist.
"""
if not file:
raise ValueError("File parameter required")
file_path = RESOURCES_DIR / file
if not file_path.exists() or not file_path.is_file():
raise FileNotFoundError(f"Resource not found: {file}")
return file_path
# -- Read endpoints --
[docs]
@route()
def tree(self) -> dict[str, Any]:
"""Return the routing tree with plugin_info for each router.
Returns:
Serialized routing tree dict with entries, routers, plugin_info.
"""
assert self.server is not None
raw = self.server.router.nodes()
return self._serialize_tree(raw)
[docs]
@route()
def plugins(self) -> dict[str, Any]:
"""Return available plugins with their configure() signatures.
Returns:
Dict with "plugins" key mapping plugin names to description and params.
"""
available = Router.available_plugins()
result: dict[str, dict[str, Any]] = {}
for name, plugin_class in available.items():
params = self._get_configure_params(plugin_class)
result[name] = {
"description": getattr(plugin_class, "plugin_description", ""),
"configure_params": params,
}
return {"plugins": result}
# -- Delegates to server --
[docs]
@route()
def config(self, router_path: str = "", entry: str = "") -> dict[str, Any]:
"""Read plugin configuration for a specific router/entry.
Args:
router_path: Dotted path to the router (empty = root).
entry: Specific entry/handler name to filter (empty = all).
Returns:
Dict with router_path, plugin_info. Error dict if router not found.
"""
assert self.server is not None
return self.server.get_plugin_config(router_path=router_path, entry=entry)
[docs]
@route()
def apply(
self,
router_path: str = "",
plugin_name: str = "",
target: str = "_all_",
**config_values: Any,
) -> dict[str, Any]:
"""Apply a plugin config change at runtime without persisting.
Args:
router_path: Dotted path to the router (empty = root).
plugin_name: Name of the plugin to configure.
target: Target slot ("_all_" or specific entry name).
**config_values: Plugin configuration key-value pairs.
Returns:
Dict with "ok": True on success, or "error" key on failure.
"""
assert self.server is not None
return self.server.set_plugin_config(
router_path=router_path,
plugin_name=plugin_name,
target=target,
**config_values,
)
[docs]
@route()
def save(self) -> dict[str, Any]:
"""Persist current runtime plugin config to plugin_config.json.
Returns:
Dict with "ok": True and "file" path on success.
"""
assert self.server is not None
return self.server.save_plugin_config()
[docs]
@route()
def revert(self) -> dict[str, Any]:
"""Revert runtime config to what is saved in plugin_config.json.
Returns:
Dict with "ok": True. Includes "message" if no saved config exists.
"""
assert self.server is not None
return self.server.revert_plugin_config()
# -- Log viewer --
[docs]
@route()
def logs(self, entry: str = "", since: str = "0", limit: int = 100) -> dict[str, Any]:
"""Return captured log entries, optionally filtered by entry name.
Args:
entry: Filter by entry/handler name (empty = all).
since: Timestamp filter - only logs after this time.
limit: Max entries to return.
Returns:
Dict with "logs" list and "server_time" timestamp.
"""
since_f = float(since)
entries = self._log_handler.get_logs(entry=entry, since=since_f, limit=limit)
return {"logs": entries, "server_time": time.time()}
[docs]
@route()
def clear_logs(self) -> dict[str, Any]:
"""Clear the log buffer.
Returns:
Dict with "ok": True.
"""
self._log_handler.clear()
return {"ok": True}
# -- Serialization helpers --
def _serialize_tree(self, node: dict[str, Any]) -> dict[str, Any]:
"""Recursively serialize a router.nodes() result to JSON-safe dict."""
if not node:
return {}
result: dict[str, Any] = {
"name": node.get("name", ""),
"description": node.get("description", ""),
"owner_doc": node.get("owner_doc", ""),
"plugin_info": node.get("plugin_info", {}),
}
entries = node.get("entries", {})
serialized_entries: dict[str, Any] = {}
for ename, edata in entries.items():
serialized_entries[ename] = {
"name": edata.get("name", ename),
"doc": edata.get("doc", ""),
"metadata": self._clean_metadata(edata.get("metadata", {})),
}
if serialized_entries:
result["entries"] = serialized_entries
routers = node.get("routers", {})
serialized_routers: dict[str, Any] = {}
for rname, rdata in routers.items():
if isinstance(rdata, dict):
serialized_routers[rname] = self._serialize_tree(rdata)
if serialized_routers:
result["routers"] = serialized_routers
return result
def _clean_metadata(self, metadata: dict[str, Any]) -> dict[str, Any]:
"""Remove non-serializable values from metadata."""
clean: dict[str, Any] = {}
for key, value in metadata.items():
if isinstance(value, (str, int, float, bool, type(None))):
clean[key] = value
elif isinstance(value, (list, tuple)):
clean[key] = list(value)
elif isinstance(value, dict):
clean[key] = self._clean_metadata(value)
else:
clean[key] = str(value)
return clean
def _get_configure_params(self, plugin_class: type) -> list[dict[str, Any]]:
"""Extract configure() parameters from a plugin class."""
configure_method = plugin_class.__dict__.get("configure")
if configure_method is None:
return []
original = getattr(configure_method, "__wrapped__", configure_method)
try:
sig = inspect.signature(original)
except (ValueError, TypeError):
return []
params: list[dict[str, Any]] = []
for name, param in sig.parameters.items():
if name in ("self", "_target", "flags"):
continue
pinfo: dict[str, Any] = {"name": name}
if param.annotation is not inspect.Parameter.empty:
ann = param.annotation
pinfo["type"] = ann.__name__ if isinstance(ann, type) else str(ann)
if param.default is not inspect.Parameter.empty:
pinfo["default"] = param.default
params.append(pinfo)
return params
if __name__ == "__main__":
pass