Skip to main content
Glama

Scribe MCP Server

by paxocial
registry.py19.4 kB
"""Plugin registry system for Scribe. This module provides a secure plugin system that allows repositories to customize Scribe behavior without modifying the core codebase. Security features include: - Plugin signature verification with SHA-256 hash pinning - Strict allowlist enforcement - Manifest validation - Sandbox-enforced boundaries - Comprehensive error handling and logging """ from __future__ import annotations import hashlib import importlib.util import inspect import json import logging from abc import ABC, abstractmethod from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Type from scribe_mcp.config.repo_config import RepoConfig from scribe_mcp.security.sandbox import safe_file_operation from scribe_mcp.config.settings import settings # Setup secure logging for plugin operations plugin_logger = logging.getLogger(__name__) @dataclass class PluginManifest: """Plugin manifest with security metadata.""" name: str version: str description: str author: str min_scribe_version: str = "1.0.0" max_scribe_version: Optional[str] = None required_permissions: List[str] = None file_hash: Optional[str] = None # SHA-256 hash of the plugin file signature: Optional[str] = None # Optional signature for verification def __post_init__(self): if self.required_permissions is None: self.required_permissions = [] class ScribePlugin(ABC): """Base class for Scribe plugins.""" name: str = "" version: str = "1.0.0" description: str = "" author: str = "" manifest: Optional[PluginManifest] = None @abstractmethod def initialize(self, config: RepoConfig) -> None: """Initialize the plugin with repository configuration.""" pass def cleanup(self) -> None: """Cleanup resources when plugin is unloaded.""" pass class TemplatePlugin(ScribePlugin): """Plugin for custom document templates.""" @abstractmethod def get_template(self, template_type: str) -> Optional[str]: """Get a custom template by type.""" pass def list_templates(self) -> List[str]: """List available template types.""" return [] class PolicyPlugin(ScribePlugin): """Plugin for repository-specific policies and constraints.""" @abstractmethod def check_permission(self, operation: str, context: Dict[str, Any]) -> bool: """Check if an operation is allowed.""" pass def validate_entry(self, entry_data: Dict[str, Any]) -> Optional[str]: """Validate a log entry, return error message if invalid.""" return None class FormatterPlugin(ScribePlugin): """Plugin for custom log entry formatting.""" @abstractmethod def format_entry(self, entry_data: Dict[str, Any]) -> str: """Format a log entry for display.""" pass def parse_entry(self, line: str) -> Optional[Dict[str, Any]]: """Parse a formatted log entry back to structured data.""" return None class HookPlugin(ScribePlugin): """Plugin for custom hooks at various points in the workflow.""" def pre_append(self, entry_data: Dict[str, Any]) -> Dict[str, Any]: """Called before an entry is appended.""" return entry_data def post_append(self, entry_data: Dict[str, Any]) -> None: """Called after an entry is appended.""" pass def pre_rotate(self, project_name: str) -> None: """Called before log rotation.""" pass def post_rotate(self, project_name: str, archive_info: Dict[str, Any]) -> None: """Called after log rotation.""" pass class PluginRegistry: """Registry for managing Scribe plugins with security hardening.""" def __init__(self, repo_root: Optional[Path] = None): self.plugins: Dict[str, ScribePlugin] = {} self.template_plugins: List[TemplatePlugin] = [] self.policy_plugins: List[PolicyPlugin] = [] self.formatter_plugins: List[FormatterPlugin] = [] self.hook_plugins: List[HookPlugin] = [] self.repo_root = repo_root self._plugin_hashes: Dict[str, str] = {} # Cache of verified plugin hashes def _verify_plugin_hash(self, plugin_file: Path, expected_hash: Optional[str] = None) -> bool: """Verify SHA-256 hash of plugin file against expected value.""" try: with open(plugin_file, 'rb') as f: file_hash = hashlib.sha256(f.read()).hexdigest() if expected_hash: if file_hash != expected_hash: plugin_logger.error( f"Plugin hash mismatch for {plugin_file.name}: " f"expected {expected_hash}, got {file_hash}" ) return False plugin_logger.info(f"Plugin hash verified for {plugin_file.name}") # Cache the hash for audit purposes self._plugin_hashes[plugin_file.name] = file_hash return True except Exception as e: plugin_logger.error(f"Failed to verify plugin hash for {plugin_file}: {e}") return False def _load_plugin_manifest(self, plugin_file: Path) -> Optional[PluginManifest]: """Load and validate plugin manifest.""" manifest_file = plugin_file.with_suffix('.json') if not manifest_file.exists(): # No manifest file, create a basic one from plugin class attributes return PluginManifest( name=plugin_file.stem, version="1.0.0", description=f"Plugin from {plugin_file.name}", author="Unknown" ) try: with open(manifest_file, 'r') as f: manifest_data = json.load(f) # Validate manifest structure required_fields = ['name', 'version', 'description', 'author'] for field in required_fields: if field not in manifest_data: plugin_logger.error(f"Missing required field '{field}' in manifest {manifest_file}") return None manifest = PluginManifest(**manifest_data) # Verify plugin file hash if specified in manifest if manifest.file_hash: if not self._verify_plugin_hash(plugin_file, manifest.file_hash): plugin_logger.error(f"Plugin hash verification failed for {plugin_file}") return None return manifest except Exception as e: plugin_logger.error(f"Failed to load manifest for {plugin_file}: {e}") return None def _is_plugin_allowed(self, plugin_name: str, allowlist: set, blocklist: set) -> bool: """Check if plugin is allowed based on allowlist and blocklist.""" # Blocklist takes precedence if plugin_name in blocklist: plugin_logger.warning(f"Plugin {plugin_name} is explicitly blocked") return False # If allowlist is set, only allowlisted plugins are permitted if allowlist and plugin_name not in allowlist: plugin_logger.warning(f"Plugin {plugin_name} not in allowlist") return False return True def load_plugins(self, config: RepoConfig) -> None: """Load plugins from the repository's plugins directory with security hardening.""" plugin_settings = config.plugin_config or {} if not plugin_settings.get("enabled"): plugin_logger.info("Plugin loading disabled (plugin_config.enabled is false or missing)") return if not config.plugins_dir or not config.plugins_dir.exists(): plugin_logger.debug("No plugins directory found") return # Security settings allowlist = set(plugin_settings.get("allowlist", [])) blocklist = set(plugin_settings.get("blocklist", [])) require_manifest = plugin_settings.get("require_manifest", False) verify_hashes = plugin_settings.get("verify_hashes", True) # Load built-in plugins first self._load_builtin_plugins(config) # Load repository-specific plugins plugins_loaded = 0 for plugin_file in config.plugins_dir.glob("*.py"): if plugin_file.name.startswith("__"): continue plugin_name = plugin_file.stem # Security check 1: Enforce repo sandbox boundaries try: safe_file_operation( settings.project_root, plugin_file, operation="read", context={"component": "plugins", "plugin_name": plugin_name} ) except Exception as sandbox_error: plugin_logger.error(f"Sandbox violation for plugin {plugin_name}: {sandbox_error}") continue # Security check 2: Allowlist/blocklist enforcement if not self._is_plugin_allowed(plugin_name, allowlist, blocklist): continue # Security check 3: Load and validate manifest if required manifest = self._load_plugin_manifest(plugin_file) if not manifest: if require_manifest: plugin_logger.error(f"Plugin {plugin_name} missing required manifest") continue # Create basic manifest for compatibility manifest = PluginManifest( name=plugin_name, version="1.0.0", description=f"Plugin from {plugin_file.name}", author="Unknown" ) # Security check 4: Hash verification if verify_hashes and not self._verify_plugin_hash(plugin_file, manifest.file_hash): plugin_logger.error(f"Plugin {plugin_name} failed hash verification") continue # Load the plugin with comprehensive error handling try: plugin = self._load_plugin_file(plugin_file, config, manifest) if plugin: self._register_plugin(plugin) plugins_loaded += 1 plugin_logger.info( f"Successfully loaded plugin: {plugin.name} v{plugin.version} " f"(hash: {self._plugin_hashes.get(plugin_file.name, 'unknown')[:8]}...)" ) except Exception as e: plugin_logger.error(f"Failed to load plugin {plugin_file}: {e}", exc_info=True) continue plugin_logger.info(f"Plugin loading completed. Loaded {plugins_loaded} plugins.") def _load_builtin_plugins(self, config: RepoConfig) -> None: """Load built-in plugins.""" # Add any built-in plugins here pass def _load_plugin_file(self, plugin_file: Path, config: RepoConfig, manifest: Optional[PluginManifest] = None) -> Optional[ScribePlugin]: """Load a plugin from a Python file with security validation.""" try: spec = importlib.util.spec_from_file_location( plugin_file.stem, plugin_file ) if spec is None or spec.loader is None: plugin_logger.error(f"Could not load spec for {plugin_file}") return None module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Look for plugin classes in the module plugin_classes = [] for name, obj in inspect.getmembers(module, inspect.isclass): if (issubclass(obj, ScribePlugin) and obj != ScribePlugin and not inspect.isabstract(obj)): plugin_classes.append((name, obj)) if not plugin_classes: plugin_logger.error(f"No valid plugin classes found in {plugin_file}") return None # Load the first valid plugin class found for class_name, plugin_class in plugin_classes: try: plugin_instance = plugin_class() # Set manifest if available if manifest: plugin_instance.manifest = manifest # Validate plugin metadata if not plugin_instance.name: plugin_instance.name = manifest.name if manifest else class_name if not plugin_instance.version: plugin_instance.version = manifest.version if manifest else "1.0.0" if not plugin_instance.author: plugin_instance.author = manifest.author if manifest else "Unknown" # Initialize plugin plugin_instance.initialize(config) # Log successful instantiation with hash for audit trail hash_info = self._plugin_hashes.get(plugin_file.name, "unknown")[:8] plugin_logger.info( f"Instantiated plugin {plugin_instance.name} v{plugin_instance.version} " f"by {plugin_instance.author} (hash: {hash_info}...)" ) return plugin_instance except Exception as e: plugin_logger.error(f"Failed to instantiate plugin {class_name}: {e}", exc_info=True) continue plugin_logger.error(f"All plugin classes in {plugin_file} failed to instantiate") return None except Exception as e: plugin_logger.error(f"Failed to load plugin module {plugin_file}: {e}", exc_info=True) return None def _register_plugin(self, plugin: ScribePlugin) -> None: """Register a plugin in the appropriate categories.""" self.plugins[plugin.name] = plugin if isinstance(plugin, TemplatePlugin): self.template_plugins.append(plugin) if isinstance(plugin, PolicyPlugin): self.policy_plugins.append(plugin) if isinstance(plugin, FormatterPlugin): self.formatter_plugins.append(plugin) if isinstance(plugin, HookPlugin): self.hook_plugins.append(plugin) def get_template(self, template_type: str) -> Optional[str]: """Get a custom template from registered template plugins.""" for plugin in self.template_plugins: template = plugin.get_template(template_type) if template: return template return None def check_permission(self, operation: str, context: Dict[str, Any]) -> bool: """Check permissions using registered policy plugins.""" for plugin in self.policy_plugins: if not plugin.check_permission(operation, context): return False return True def validate_entry(self, entry_data: Dict[str, Any]) -> Optional[str]: """Validate entry using registered policy plugins.""" for plugin in self.policy_plugins: error = plugin.validate_entry(entry_data) if error: return error return None def format_entry(self, entry_data: Dict[str, Any]) -> str: """Format entry using registered formatter plugins.""" for plugin in self.formatter_plugins: try: return plugin.format_entry(entry_data) except Exception: continue # Try next plugin return None def execute_hook_pre_append(self, entry_data: Dict[str, Any]) -> Dict[str, Any]: """Execute pre-append hooks.""" data = entry_data.copy() for plugin in self.hook_plugins: try: data = plugin.pre_append(data) except Exception as e: plugin_logger.warning(f"Pre-append hook failed in {plugin.name}: {e}") return data def execute_hook_post_append(self, entry_data: Dict[str, Any]) -> None: """Execute post-append hooks.""" for plugin in self.hook_plugins: try: plugin.post_append(entry_data) except Exception as e: plugin_logger.warning(f"Post-append hook failed in {plugin.name}: {e}") def execute_hook_pre_rotate(self, project_name: str) -> None: """Execute pre-rotation hooks.""" for plugin in self.hook_plugins: try: plugin.pre_rotate(project_name) except Exception as e: plugin_logger.warning(f"Pre-rotate hook failed in {plugin.name}: {e}") def execute_hook_post_rotate(self, project_name: str, archive_info: Dict[str, Any]) -> None: """Execute post-rotation hooks.""" for plugin in self.hook_plugins: try: plugin.post_rotate(project_name, archive_info) except Exception as e: plugin_logger.warning(f"Post-rotate hook failed in {plugin.name}: {e}") def cleanup(self) -> None: """Cleanup all registered plugins.""" for plugin in self.plugins.values(): try: plugin.cleanup() except Exception as e: plugin_logger.warning(f"Plugin cleanup failed for {plugin.name}: {e}") self.plugins.clear() self.template_plugins.clear() self.policy_plugins.clear() self.formatter_plugins.clear() self.hook_plugins.clear() # Global plugin registry instance _plugin_registry: Optional[PluginRegistry] = None def get_plugin_registry(repo_root: Optional[Path] = None) -> PluginRegistry: """Get the global plugin registry instance with repository context.""" global _plugin_registry if _plugin_registry is None: _plugin_registry = PluginRegistry(repo_root=repo_root) return _plugin_registry def initialize_plugins(config: RepoConfig) -> None: """Initialize plugins for a repository configuration.""" registry = get_plugin_registry(config.repo_root) registry.cleanup() # Cleanup any existing plugins registry.load_plugins(config) def get_plugin_security_info() -> Dict[str, Any]: """Get security information about loaded plugins for audit.""" registry = get_plugin_registry() return { "plugins_loaded": len(registry.plugins), "plugin_hashes": registry._plugin_hashes, "plugin_manifests": { name: plugin.manifest.__dict__ if plugin.manifest else None for name, plugin in registry.plugins.items() } } def get_template(template_type: str) -> Optional[str]: """Get a custom template.""" return get_plugin_registry().get_template(template_type) def check_permission(operation: str, context: Dict[str, Any]) -> bool: """Check if an operation is allowed.""" return get_plugin_registry().check_permission(operation, context) def validate_entry(entry_data: Dict[str, Any]) -> Optional[str]: """Validate a log entry.""" return get_plugin_registry().validate_entry(entry_data) def format_entry(entry_data: Dict[str, Any]) -> Optional[str]: """Format a log entry.""" return get_plugin_registry().format_entry(entry_data)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paxocial/scribe_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server