Skip to main content
Glama

MSFConsole MCP Server

msf_plugin_system.pyโ€ข16 kB
""" MSF Plugin System v5.0 - Dynamic Plugin Framework Provides extensible plugin architecture for MSF Console MCP Server """ import asyncio import importlib import inspect import logging import os from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Set, Type, Union from msf_stable_integration import MSFConsoleStableWrapper, OperationResult, OperationStatus logger = logging.getLogger(__name__) class PluginCategory(Enum): """Plugin categories matching MSF architecture""" SCANNER = "scanner" EXPLOIT = "exploit" POST = "post" AUXILIARY = "auxiliary" INTEGRATION = "integration" SESSION = "session" NOTIFICATION = "notification" AUTOMATION = "automation" REPORTING = "reporting" UTILITY = "utility" @dataclass class PluginMetadata: """Plugin metadata and registration information""" name: str description: str category: PluginCategory version: str = "1.0.0" author: str = "MSF MCP" dependencies: List[str] = field(default_factory=list) commands: Dict[str, str] = field(default_factory=dict) capabilities: Set[str] = field(default_factory=set) auto_load: bool = True priority: int = 50 # 0-100, higher = load first @dataclass class PluginContext: """Context passed to plugin operations""" msf: MSFConsoleStableWrapper framework_data: Dict[str, Any] session_data: Dict[str, Any] = field(default_factory=dict) workspace: str = "default" user_data: Dict[str, Any] = field(default_factory=dict) class PluginInterface(ABC): """Abstract base class for all MSF plugins""" def __init__(self, context: PluginContext): self.context = context self.msf = context.msf self._initialized = False self._hooks: Dict[str, List[Callable]] = {} @property @abstractmethod def metadata(self) -> PluginMetadata: """Return plugin metadata""" pass @abstractmethod async def initialize(self) -> OperationResult: """Initialize plugin resources""" pass @abstractmethod async def cleanup(self) -> OperationResult: """Cleanup plugin resources""" pass async def execute_command(self, command: str, args: Dict[str, Any]) -> OperationResult: """Execute a plugin command""" if command not in self.metadata.commands: return OperationResult( OperationStatus.FAILURE, None, 0.0, f"Unknown command: {command}" ) method_name = f"cmd_{command}" if hasattr(self, method_name): method = getattr(self, method_name) return await method(**args) else: return OperationResult( OperationStatus.FAILURE, None, 0.0, f"Command not implemented: {command}" ) def register_hook(self, event: str, callback: Callable) -> None: """Register event hook""" if event not in self._hooks: self._hooks[event] = [] self._hooks[event].append(callback) async def emit_event(self, event: str, data: Any) -> None: """Emit event to registered hooks""" if event in self._hooks: for callback in self._hooks[event]: try: if asyncio.iscoroutinefunction(callback): await callback(data) else: callback(data) except Exception as e: logger.error(f"Hook error in {self.metadata.name}: {e}") class PluginRegistry: """Central plugin registry with dynamic discovery and management""" def __init__(self, msf: MSFConsoleStableWrapper): self.msf = msf self._plugins: Dict[str, PluginInterface] = {} self._plugin_classes: Dict[str, Type[PluginInterface]] = {} self._categories: Dict[PluginCategory, List[str]] = {cat: [] for cat in PluginCategory} self._load_order: List[str] = [] self._event_subscribers: Dict[str, List[str]] = {} def register_plugin_class(self, plugin_class: Type[PluginInterface]) -> None: """Register a plugin class for instantiation""" # Create temporary instance to get metadata temp_context = PluginContext(self.msf, {}) temp_instance = plugin_class(temp_context) metadata = temp_instance.metadata self._plugin_classes[metadata.name] = plugin_class logger.info(f"Registered plugin class: {metadata.name}") async def load_plugin(self, name: str, context: Optional[PluginContext] = None) -> OperationResult: """Load and initialize a plugin""" if name in self._plugins: return OperationResult( OperationStatus.FAILURE, None, 0.0, f"Plugin already loaded: {name}" ) if name not in self._plugin_classes: return OperationResult( OperationStatus.FAILURE, None, 0.0, f"Unknown plugin: {name}" ) try: # Create plugin context if not provided if context is None: context = PluginContext(self.msf, {}) # Instantiate plugin plugin_class = self._plugin_classes[name] plugin = plugin_class(context) # Initialize plugin result = await plugin.initialize() if result.status != OperationStatus.SUCCESS: return result # Register plugin self._plugins[name] = plugin self._categories[plugin.metadata.category].append(name) self._load_order.append(name) logger.info(f"Loaded plugin: {name} v{plugin.metadata.version}") return OperationResult( OperationStatus.SUCCESS, {"plugin": name, "metadata": plugin.metadata}, 0.0 ) except Exception as e: logger.error(f"Failed to load plugin {name}: {e}") return OperationResult( OperationStatus.FAILURE, None, 0.0, str(e) ) async def unload_plugin(self, name: str) -> OperationResult: """Unload and cleanup a plugin""" if name not in self._plugins: return OperationResult( OperationStatus.FAILURE, None, 0.0, f"Plugin not loaded: {name}" ) try: plugin = self._plugins[name] # Cleanup plugin result = await plugin.cleanup() if result.status != OperationStatus.SUCCESS: logger.warning(f"Plugin cleanup failed: {name}") # Remove from registry del self._plugins[name] self._categories[plugin.metadata.category].remove(name) self._load_order.remove(name) logger.info(f"Unloaded plugin: {name}") return OperationResult( OperationStatus.SUCCESS, {"plugin": name}, 0.0 ) except Exception as e: logger.error(f"Failed to unload plugin {name}: {e}") return OperationResult( OperationStatus.FAILURE, None, 0.0, str(e) ) async def reload_plugin(self, name: str) -> OperationResult: """Reload a plugin (unload and load)""" # Get current context if plugin is loaded context = None if name in self._plugins: context = self._plugins[name].context # Unload if loaded if name in self._plugins: unload_result = await self.unload_plugin(name) if unload_result.status != OperationStatus.SUCCESS: return unload_result # Load plugin return await self.load_plugin(name, context) def get_plugin(self, name: str) -> Optional[PluginInterface]: """Get loaded plugin instance""" return self._plugins.get(name) def list_plugins(self, category: Optional[PluginCategory] = None, loaded_only: bool = False) -> List[Dict[str, Any]]: """List available or loaded plugins""" plugins = [] if loaded_only: # List only loaded plugins for name, plugin in self._plugins.items(): if category is None or plugin.metadata.category == category: plugins.append({ "name": name, "metadata": plugin.metadata, "loaded": True }) else: # List all registered plugins for name, plugin_class in self._plugin_classes.items(): temp_context = PluginContext(self.msf, {}) temp_instance = plugin_class(temp_context) metadata = temp_instance.metadata if category is None or metadata.category == category: plugins.append({ "name": name, "metadata": metadata, "loaded": name in self._plugins }) return sorted(plugins, key=lambda p: (p["metadata"].priority, p["name"]), reverse=True) async def execute_plugin_command(self, plugin_name: str, command: str, args: Dict[str, Any]) -> OperationResult: """Execute command on a loaded plugin""" plugin = self.get_plugin(plugin_name) if not plugin: return OperationResult( OperationStatus.FAILURE, None, 0.0, f"Plugin not loaded: {plugin_name}" ) return await plugin.execute_command(command, args) async def broadcast_event(self, event: str, data: Any) -> None: """Broadcast event to all loaded plugins""" for plugin in self._plugins.values(): try: await plugin.emit_event(event, data) except Exception as e: logger.error(f"Event broadcast error for {plugin.metadata.name}: {e}") async def discover_plugins(self, plugin_dir: Union[str, Path]) -> List[str]: """Discover plugins in directory""" plugin_dir = Path(plugin_dir) discovered = [] if not plugin_dir.exists(): logger.warning(f"Plugin directory not found: {plugin_dir}") return discovered for file_path in plugin_dir.glob("*.py"): if file_path.name.startswith("_"): continue try: # Import module spec = importlib.util.spec_from_file_location( file_path.stem, file_path ) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Find plugin classes for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass(obj, PluginInterface) and obj != PluginInterface): self.register_plugin_class(obj) discovered.append(obj.__name__) except Exception as e: logger.error(f"Failed to discover plugins in {file_path}: {e}") logger.info(f"Discovered {len(discovered)} plugins in {plugin_dir}") return discovered async def load_auto_plugins(self) -> List[str]: """Load all plugins marked for auto-loading""" loaded = [] for name, plugin_class in self._plugin_classes.items(): temp_context = PluginContext(self.msf, {}) temp_instance = plugin_class(temp_context) if temp_instance.metadata.auto_load: result = await self.load_plugin(name) if result.status == OperationStatus.SUCCESS: loaded.append(name) return loaded class PluginManager: """High-level plugin management interface""" def __init__(self, msf: MSFConsoleStableWrapper): self.msf = msf self.registry = PluginRegistry(msf) self._plugin_dirs: List[Path] = [] async def initialize(self, plugin_dirs: Optional[List[Union[str, Path]]] = None) -> OperationResult: """Initialize plugin manager""" try: # Set default plugin directories if plugin_dirs: self._plugin_dirs = [Path(d) for d in plugin_dirs] else: # Default to plugins subdirectory self._plugin_dirs = [Path(__file__).parent / "plugins"] # Discover plugins in all directories discovered = [] for plugin_dir in self._plugin_dirs: discovered.extend(await self.registry.discover_plugins(plugin_dir)) # Load auto-load plugins loaded = await self.registry.load_auto_plugins() return OperationResult( OperationStatus.SUCCESS, { "discovered": discovered, "loaded": loaded, "plugin_dirs": [str(d) for d in self._plugin_dirs] }, 0.0 ) except Exception as e: logger.error(f"Failed to initialize plugin manager: {e}") return OperationResult( OperationStatus.FAILURE, None, 0.0, str(e) ) async def execute_command(self, plugin_name: str, command: str, args: Optional[Dict[str, Any]] = None) -> OperationResult: """Execute plugin command with validation""" if args is None: args = {} return await self.registry.execute_plugin_command(plugin_name, command, args) def list_plugins(self, **kwargs) -> List[Dict[str, Any]]: """List plugins with filtering options""" return self.registry.list_plugins(**kwargs) async def load_plugin(self, name: str) -> OperationResult: """Load a specific plugin""" return await self.registry.load_plugin(name) async def unload_plugin(self, name: str) -> OperationResult: """Unload a specific plugin""" return await self.registry.unload_plugin(name) async def reload_plugin(self, name: str) -> OperationResult: """Reload a specific plugin""" return await self.registry.reload_plugin(name) def get_plugin_info(self, name: str) -> Optional[Dict[str, Any]]: """Get detailed plugin information""" plugin = self.registry.get_plugin(name) if plugin: return { "name": name, "metadata": plugin.metadata, "loaded": True, "commands": list(plugin.metadata.commands.keys()), "capabilities": list(plugin.metadata.capabilities) } return None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PreistlyPython/msfconsole-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server