Skip to main content
Glama

MSFConsole MCP Server

by LYFTIUM-INC
msfconsole_mcp_enhanced.pyโ€ข39.3 kB
#!/usr/bin/env python3 """ Enhanced Metasploit Framework Console MCP Server ------------------------------------------------ Comprehensive MCP integration with dual-mode operation, advanced security, and full feature coverage of Metasploit Framework capabilities. """ import asyncio import logging import json import sys import os from typing import Dict, Any, Optional, List from dataclasses import asdict # Import MCP SDK try: from mcp.server.fastmcp import FastMCP, Context except ImportError as e: # Use stderr to avoid breaking MCP JSON protocol sys.stderr.write(f"Error importing MCP SDK: {e}\n") sys.stderr.write("Please install the MCP SDK: pip install mcp\n") sys.exit(1) # Set up logging first logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("msfconsole_mcp_enhanced.log"), logging.StreamHandler(sys.stderr) # Use stderr to avoid stdout pollution ] ) logger = logging.getLogger(__name__) # Import our enhanced modules try: from msf_rpc_manager import MSFRPCManager, RPCConfig from msf_dual_mode import MSFDualModeHandler, ExecutionResult from msf_security import MSFSecurityManager from msf_config import get_config from msf_init import get_initializer except ImportError as e: logger.error(f"Failed to import enhanced modules: {e}") sys.stderr.write(f"Import error: {e}\n") sys.stderr.write("Make sure all required files are present in the directory.\n") sys.exit(1) # Initialize FastMCP server VERSION = "2.0.0" mcp = FastMCP("msfconsole-enhanced", version=VERSION) # Enhanced timeout configuration for execute_msf_command COMMAND_TIMEOUTS = { # Fast commands - basic status and help "help": 45, "db_status": 30, "workspace": 30, # Medium commands - information retrieval "version": 75, "show": 60, "info": 75, # Complex commands - operations and searches "search": 90, "use": 90, "exploit": 120, "generate": 90, # Default for unknown commands "default": 75 } def get_adaptive_timeout(command: str) -> int: """Get adaptive timeout based on command type""" command_lower = command.lower().strip() # Check for specific command patterns for pattern, timeout in COMMAND_TIMEOUTS.items(): if pattern in command_lower: return timeout # Default timeout return COMMAND_TIMEOUTS["default"] # Global dual-mode handler dual_mode_handler: Optional[MSFDualModeHandler] = None # Global security manager instance security_manager: Optional[MSFSecurityManager] = None async def ensure_initialized(): """Ensure the dual-mode handler is initialized.""" global dual_mode_handler, security_manager if dual_mode_handler is None: try: # Initialize Metasploit framework first logger.info("Initializing Metasploit framework...") initializer = await asyncio.wait_for(get_initializer(), timeout=30) # Initialize security manager try: from msf_security import SecurityPolicy security_manager = MSFSecurityManager(SecurityPolicy()) except ImportError: logger.warning("Security manager not available, using basic validation") security_manager = None # Configure RPC settings rpc_config = RPCConfig( host="127.0.0.1", port=55552, username="msf", password="msf123", ssl=False, timeout=30 ) dual_mode_handler = MSFDualModeHandler(rpc_config) # Initialize with timeout init_result = await asyncio.wait_for(dual_mode_handler.initialize(), timeout=45) if not init_result: raise RuntimeError("Failed to initialize Metasploit dual-mode handler") logger.info("MSF Enhanced MCP Server initialized successfully") except asyncio.TimeoutError: logger.error("Initialization timed out") raise RuntimeError("Metasploit initialization timed out - server may be slow or unavailable") except Exception as e: logger.error(f"Initialization failed: {e}") raise RuntimeError(f"Failed to initialize Metasploit integration: {e}") # MCP Tools @mcp.tool() async def get_msf_status(ctx: Context) -> str: """ Get comprehensive status of the Metasploit integration. Returns: Detailed status information including RPC connection, available modes, etc. """ await ctx.info("Getting MSF integration status") try: # Check if already initialized if dual_mode_handler is None: # Try basic initialization with timeout logger.info("Attempting basic status check without full initialization") return json.dumps({ "status": "initializing", "version": VERSION, "message": "Metasploit handler not yet fully initialized", "initialization_required": True }, indent=2) # Get status from existing handler status = dual_mode_handler.get_status() return json.dumps({ "status": "operational", "version": VERSION, "integration_details": status }, indent=2) except Exception as e: logger.error(f"Error getting status: {e}") return json.dumps({ "status": "error", "error": str(e), "version": VERSION }, indent=2) @mcp.tool() async def execute_msf_command(ctx: Context, command: str, workspace: str = "default", timeout: int = None) -> str: """ Execute a Metasploit Framework command with enhanced security and adaptive timeout. Args: command: The MSF command to execute (e.g., 'hosts', 'search ms17_010') workspace: Metasploit workspace to use (default: 'default') timeout: Command timeout in seconds (auto-detected based on command type if None) Returns: JSON formatted result with output, execution details, and metadata """ # Use adaptive timeout if not specified if timeout is None: timeout = get_adaptive_timeout(command) await ctx.info(f"Executing MSF command: {command[:50]}... (timeout: {timeout}s)") try: # Security validation if security_manager: validation_result = await security_manager.validate_command(command, {"workspace": workspace}) if not validation_result["valid"]: return json.dumps({ "success": False, "error": "Command blocked by security validation", "command": command, "security_details": validation_result }, indent=2) command = validation_result["sanitized_command"] else: # Basic validation fallback command = command.replace('\x00', '').replace('\r', '').strip() if len(command) > 1000: command = command[:1000] # Try initialization with timeout try: await asyncio.wait_for(ensure_initialized(), timeout=60) except asyncio.TimeoutError: return json.dumps({ "success": False, "error": "Metasploit initialization timeout", "message": "The Metasploit framework is taking too long to initialize. Please try again later." }, indent=2) # Execute command with context context = { "workspace": workspace, "timeout": timeout } result = await dual_mode_handler.execute_command(command, context) await ctx.info(f"Command executed successfully using {result.mode_used} mode") # Use improved parser for better output structure parsed_result = msf_parser.parse(result.output) response_data = { "success": result.success, "command": command, "execution_details": { "mode_used": result.mode_used, "execution_time": result.execution_time, "workspace": workspace }, "metadata": result.metadata or {} } # Add parsed or raw output based on parsing success if parsed_result.success and parsed_result.output_type != OutputType.RAW: response_data["parsed_output"] = { "type": parsed_result.output_type.value, "data": parsed_result.data, "metadata": parsed_result.metadata } # Keep raw output for reference when parsed successfully response_data["raw_output"] = result.output else: # Use raw output when parsing fails response_data["output"] = result.output if parsed_result.error_message: response_data["parsing_info"] = { "attempted": True, "error": parsed_result.error_message } if result.error: response_data["error"] = result.error return json.dumps(response_data, indent=2) except Exception as e: logger.error(f"Error executing command: {e}") await ctx.error(f"Command execution failed: {e}") return json.dumps({ "success": False, "error": str(e), "command": command }, indent=2) @mcp.tool() async def search_modules(ctx: Context, query: str, module_type: str = "all") -> str: """ Search for Metasploit modules with advanced filtering. Args: query: Search query (e.g., 'ms17_010', 'type:exploit platform:windows') module_type: Filter by module type (exploit, auxiliary, payload, encoder, nop, post, all) Returns: JSON formatted search results with module details """ await ctx.info(f"Searching modules: {query}") try: # Try initialization with timeout try: await asyncio.wait_for(ensure_initialized(), timeout=60) except asyncio.TimeoutError: return json.dumps({ "success": False, "error": "Metasploit initialization timeout", "message": "The Metasploit framework is taking too long to initialize. Please try again later." }, indent=2) # Build search command search_cmd = f"search {query}" if module_type != "all": search_cmd += f" type:{module_type}" result = await dual_mode_handler.execute_command(search_cmd) # Use improved parser for better formatting parsed_result = msf_parser.parse(result.output) if parsed_result.success and parsed_result.output_type == OutputType.TABLE: return json.dumps({ "success": True, "query": query, "module_type": module_type, "results_count": len(parsed_result.data), "modules": parsed_result.data, "parsing_metadata": parsed_result.metadata }, indent=2) else: # Fallback to legacy parsing or raw output parsed_modules = _parse_search_results(result.output) return json.dumps({ "success": result.success, "query": query, "module_type": module_type, "results_count": len(parsed_modules), "modules": parsed_modules, "raw_output": result.output, "parsing_error": parsed_result.error_message }, indent=2) except Exception as e: logger.error(f"Error searching modules: {e}") return json.dumps({ "success": False, "error": str(e), "query": query }, indent=2) @mcp.tool() async def manage_workspaces(ctx: Context, action: str, workspace_name: str = "") -> str: """ Manage Metasploit workspaces with full lifecycle support. Args: action: Action to perform (list, create, delete, switch, rename) workspace_name: Name of workspace for create/delete/switch/rename actions Returns: JSON formatted workspace operation results """ await ctx.info(f"Managing workspace: {action}") try: # Try initialization with timeout try: await asyncio.wait_for(ensure_initialized(), timeout=60) except asyncio.TimeoutError: return json.dumps({ "success": False, "error": "Metasploit initialization timeout", "message": "The Metasploit framework is taking too long to initialize. Please try again later." }, indent=2) # Build workspace command if action == "list": command = "workspace" elif action == "create" and workspace_name: command = f"workspace -a {workspace_name}" elif action == "delete" and workspace_name: command = f"workspace -d {workspace_name}" elif action == "switch" and workspace_name: command = f"workspace {workspace_name}" elif action == "rename" and workspace_name: # This requires current workspace name as well command = f"workspace -r {workspace_name}" else: return json.dumps({ "success": False, "error": "Invalid action or missing workspace name", "valid_actions": ["list", "create", "delete", "switch", "rename"] }, indent=2) result = await dual_mode_handler.execute_command(command) # Parse workspace list if listing workspaces = [] if action == "list" and result.success: workspaces = _parse_workspace_list(result.output) return json.dumps({ "success": result.success, "action": action, "workspace_name": workspace_name, "workspaces": workspaces, "output": result.output, "error": result.error }, indent=2) except Exception as e: logger.error(f"Error managing workspace: {e}") return json.dumps({ "success": False, "error": str(e), "action": action }, indent=2) @mcp.tool() async def database_operations(ctx: Context, operation: str, filters: str = "") -> str: """ Perform database operations with advanced querying capabilities. Args: operation: Database operation (hosts, services, vulns, creds, loot, notes, sessions) filters: Optional filters for the query (e.g., 'address:192.168.1.0/24') Returns: JSON formatted database results with parsed data """ await ctx.info(f"Database operation: {operation}") try: # Try initialization with timeout try: await asyncio.wait_for(ensure_initialized(), timeout=60) except asyncio.TimeoutError: return json.dumps({ "success": False, "error": "Metasploit initialization timeout", "message": "The Metasploit framework is taking too long to initialize. Please try again later." }, indent=2) # Build database command valid_operations = ["hosts", "services", "vulns", "creds", "loot", "notes", "sessions"] if operation not in valid_operations: return json.dumps({ "success": False, "error": f"Invalid operation: {operation}", "valid_operations": valid_operations }, indent=2) command = operation if filters: command += f" {filters}" result = await dual_mode_handler.execute_command(command) # Parse results based on operation type parsed_data = [] if result.success: if operation == "hosts": parsed_data = _parse_hosts(result.output) elif operation == "services": parsed_data = _parse_services(result.output) elif operation == "vulns": parsed_data = _parse_vulns(result.output) elif operation == "sessions": parsed_data = _parse_sessions(result.output) return json.dumps({ "success": result.success, "operation": operation, "filters": filters, "count": len(parsed_data), "data": parsed_data, "raw_output": result.output, "error": result.error }, indent=2) except Exception as e: logger.error(f"Error in database operation: {e}") return json.dumps({ "success": False, "error": str(e), "operation": operation }, indent=2) @mcp.tool() async def session_management(ctx: Context, action: str, session_id: str = "", command: str = "") -> str: """ Advanced session management with interactive capabilities. Args: action: Action to perform (list, interact, execute, kill, upgrade) session_id: Session ID for interact/execute/kill/upgrade actions command: Command to execute in session (for execute action) Returns: JSON formatted session management results """ await ctx.info(f"Session management: {action}") try: # Try initialization with timeout try: await asyncio.wait_for(ensure_initialized(), timeout=60) except asyncio.TimeoutError: return json.dumps({ "success": False, "error": "Metasploit initialization timeout", "message": "The Metasploit framework is taking too long to initialize. Please try again later." }, indent=2) # Build session command if action == "list": command_str = "sessions -l" elif action == "interact" and session_id: command_str = f"sessions -i {session_id}" elif action == "execute" and session_id and command: # Validate command for session execution if security_manager: command = security_manager._sanitize_command(command) validation_result = await security_manager.validate_command(command) if not validation_result.get("allowed", True): return json.dumps({ "success": False, "error": f"Command blocked by security validation: {validation_result.get('reason', 'Unknown')}" }, indent=2) command_str = f"sessions -c '{command}' {session_id}" elif action == "kill" and session_id: command_str = f"sessions -k {session_id}" elif action == "upgrade" and session_id: command_str = f"sessions -u {session_id}" else: return json.dumps({ "success": False, "error": "Invalid action or missing required parameters", "valid_actions": ["list", "interact", "execute", "kill", "upgrade"] }, indent=2) result = await dual_mode_handler.execute_command(command_str) # Parse session list if listing sessions = [] if action == "list" and result.success: sessions = _parse_sessions(result.output) return json.dumps({ "success": result.success, "action": action, "session_id": session_id, "sessions": sessions, "output": result.output, "error": result.error }, indent=2) except Exception as e: logger.error(f"Error in session management: {e}") return json.dumps({ "success": False, "error": str(e), "action": action }, indent=2) @mcp.tool() async def module_operations(ctx: Context, action: str, module_path: str = "", options: Dict[str, str] = None) -> str: """ Advanced module operations with configuration and execution. Args: action: Action to perform (info, use, options, set, execute, search_payloads) module_path: Path to the module (e.g., 'exploit/windows/smb/ms17_010_eternalblue') options: Dictionary of module options to set Returns: JSON formatted module operation results """ await ctx.info(f"Module operation: {action}") try: # Try initialization with timeout try: await asyncio.wait_for(ensure_initialized(), timeout=60) except asyncio.TimeoutError: return json.dumps({ "success": False, "error": "Metasploit initialization timeout", "action": action, "message": "The Metasploit framework is taking too long to initialize. Please try again later." }, indent=2) options = options or {} if action == "info" and module_path: # Use direct info command which is more reliable command = f"info {module_path}" elif action == "use" and module_path: command = f"use {module_path}" elif action == "options" and module_path: # Show options for a specific module command = f"use {module_path}; show options" elif action == "set" and module_path and options: commands = [f"use {module_path}"] for key, value in options.items(): commands.append(f"set {key} {value}") commands.append("options") # Show final options result = await dual_mode_handler.execute_batch_commands(commands) return json.dumps({ "success": all(r.success for r in result), "action": action, "module_path": module_path, "options_set": options, "results": [asdict(r) for r in result] }, indent=2) elif action == "execute" and module_path: commands = [f"use {module_path}"] for key, value in options.items(): commands.append(f"set {key} {value}") commands.append("exploit") result = await dual_mode_handler.execute_batch_commands(commands) return json.dumps({ "success": all(r.success for r in result), "action": action, "module_path": module_path, "options_used": options, "results": [asdict(r) for r in result] }, indent=2) elif action == "search_payloads" and module_path: command = f"use {module_path}; show payloads" else: return json.dumps({ "success": False, "error": "Invalid action or missing required parameters", "valid_actions": ["info", "use", "options", "set", "execute", "search_payloads"] }, indent=2) result = await dual_mode_handler.execute_command(command) return json.dumps({ "success": result.success, "action": action, "module_path": module_path, "output": result.output, "error": result.error, "execution_details": { "mode_used": result.mode_used, "execution_time": result.execution_time } }, indent=2) except Exception as e: logger.error(f"Error in module operation: {e}") return json.dumps({ "success": False, "error": str(e), "action": action }, indent=2) @mcp.tool() async def payload_generation(ctx: Context, payload_type: str, options: Dict[str, str] = None, output_format: str = "raw") -> str: """ Generate payloads using msfvenom with comprehensive options support. Args: payload_type: Type of payload (e.g., 'windows/meterpreter/reverse_tcp') options: Dictionary of payload options (LHOST, LPORT, etc.) output_format: Output format (raw, exe, elf, macho, etc.) Returns: JSON formatted payload generation results """ await ctx.info(f"Generating payload: {payload_type}") try: # Try initialization with timeout try: await asyncio.wait_for(ensure_initialized(), timeout=60) except asyncio.TimeoutError: return json.dumps({ "success": False, "error": "Metasploit initialization timeout", "payload_type": payload_type, "message": "The Metasploit framework is taking too long to initialize. Please try again later." }, indent=2) options = options or {} # Try different approaches for payload generation approaches = [ # Approach 1: Use generate command in msfconsole (if available) { "method": "generate_command", "command": f"use payload/{payload_type}", "description": "Using MSF console generate command" }, # Approach 2: Use external msfvenom via subprocess { "method": "external_msfvenom", "command": None, # Will be handled separately "description": "Using external msfvenom command" } ] result = None final_approach = None # Try approach 1: MSF console generate command try: # Set up payload module setup_commands = [ f"use payload/{payload_type}" ] # Set payload options for key, value in options.items(): setup_commands.append(f"set {key} {value}") # Generate payload setup_commands.append("generate") # Execute as batch batch_result = await dual_mode_handler.execute_batch_commands(setup_commands) if batch_result and any(r.success for r in batch_result): # Find the generate result generate_result = None for r in batch_result: if "generate" in r.output.lower() or len(r.output) > 100: # Payload output is usually long generate_result = r break if generate_result: result = generate_result final_approach = approaches[0] except Exception as e: logger.warning(f"MSF console generate failed: {e}") # Try approach 2: External msfvenom if console approach failed if not result: try: import subprocess # Build msfvenom command for external execution command_parts = ["msfvenom", "-p", payload_type] # Add options for key, value in options.items(): command_parts.append(f"{key}={value}") # Add output format if output_format != "raw": command_parts.extend(["-f", output_format]) # Execute msfvenom externally msfvenom_command = " ".join(command_parts) proc_result = subprocess.run( command_parts, capture_output=True, text=True, timeout=60 ) # Create result-like object class ExternalResult: def __init__(self, success, output, error): self.success = success self.output = output self.error = error self.execution_time = 0 self.mode_used = "external_subprocess" result = ExternalResult( success=proc_result.returncode == 0, output=proc_result.stdout, error=proc_result.stderr ) final_approach = approaches[1] except Exception as e: logger.error(f"External msfvenom failed: {e}") return json.dumps({ "success": False, "error": f"All payload generation methods failed. Console error: MSF generate not available. External error: {str(e)}", "payload_type": payload_type, "approaches_tried": [a["description"] for a in approaches] }, indent=2) return json.dumps({ "success": result.success if result else False, "payload_type": payload_type, "options": options, "output_format": output_format, "method_used": final_approach["description"] if final_approach else "Unknown", "output": result.output if result else "", "error": result.error if result else "No successful generation method", "execution_time": getattr(result, 'execution_time', 0) if result else 0 }, indent=2) except Exception as e: logger.error(f"Error generating payload: {e}") return json.dumps({ "success": False, "error": str(e), "payload_type": payload_type }, indent=2) @mcp.tool() async def resource_script_execution(ctx: Context, script_commands: List[str], workspace: str = "default") -> str: """ Execute a series of commands as a resource script for automation. Args: script_commands: List of MSF commands to execute in sequence workspace: Workspace to execute in Returns: JSON formatted batch execution results """ await ctx.info(f"Executing resource script with {len(script_commands)} commands") try: # Try initialization with timeout try: await asyncio.wait_for(ensure_initialized(), timeout=60) except asyncio.TimeoutError: return json.dumps({ "success": False, "error": "Metasploit initialization timeout", "message": "The Metasploit framework is taking too long to initialize. Please try again later." }, indent=2) # Validate all commands validated_commands = [] for cmd in script_commands: if security_manager: sanitized = security_manager._sanitize_command(cmd) validation_result = await security_manager.validate_command(sanitized) if validation_result.get("allowed", True): validated_commands.append(sanitized) else: return json.dumps({ "success": False, "error": f"Command blocked by security validation: {cmd}", "reason": validation_result.get("reason", "Unknown"), "validated_commands": validated_commands }, indent=2) else: # Basic sanitization if security manager not available sanitized = cmd.strip() validated_commands.append(sanitized) # Execute as batch context = {"workspace": workspace, "batch_mode": True} results = await dual_mode_handler.execute_batch_commands(validated_commands, context) return json.dumps({ "success": all(r.success for r in results), "commands_executed": len(validated_commands), "workspace": workspace, "results": [asdict(r) for r in results], "total_execution_time": sum(r.execution_time for r in results) }, indent=2) except Exception as e: logger.error(f"Error executing resource script: {e}") return json.dumps({ "success": False, "error": str(e), "commands": script_commands }, indent=2) # Import improved parser from improved_msf_parser import ImprovedMSFParser, OutputType # Initialize global parser msf_parser = ImprovedMSFParser() # Legacy parsing helper functions (keeping for compatibility) def _parse_search_results(output: str) -> List[Dict[str, str]]: """Parse module search results.""" modules = [] lines = output.split('\n') for line in lines: line = line.strip() if not line or line.startswith('#') or '===' in line or 'Matching Modules' in line: continue # Skip header lines and separators if line.startswith('Name') or line.startswith('----') or line.startswith('='): continue # Try to parse module line - typical format: # module_name disclosure_date rank description parts = line.split(None, 3) # Split into max 4 parts if len(parts) >= 1: modules.append({ "name": parts[0], "disclosure_date": parts[1] if len(parts) > 1 else "", "rank": parts[2] if len(parts) > 2 else "", "description": parts[3] if len(parts) > 3 else "" }) return modules def _parse_workspace_list(output: str) -> List[Dict[str, str]]: """Parse workspace list output.""" workspaces = [] lines = output.split('\n') for line in lines: line = line.strip() # Skip empty lines and headers if not line or line == 'Workspaces' or line.startswith('='): continue # Current workspace is marked with * current = line.startswith('*') name = line.lstrip('* ').strip() if name: workspaces.append({ "name": name, "current": current }) return workspaces def _parse_hosts(output: str) -> List[Dict[str, str]]: """Parse hosts command output.""" hosts = [] lines = output.split('\n') # Find header line header_idx = -1 for i, line in enumerate(lines): if 'address' in line.lower() and 'name' in line.lower(): header_idx = i break if header_idx == -1: return hosts # Parse data lines for line in lines[header_idx + 2:]: # Skip header and separator line = line.strip() if line and not line.startswith('='): parts = line.split(None, 6) if len(parts) >= 2: host = { "address": parts[0], "mac": parts[1] if len(parts) > 1 else "", "name": parts[2] if len(parts) > 2 else "", "os_family": parts[3] if len(parts) > 3 else "", "os_flavor": parts[4] if len(parts) > 4 else "", "os_sp": parts[5] if len(parts) > 5 else "", "purpose": parts[6] if len(parts) > 6 else "", "info": parts[7] if len(parts) > 7 else "" } hosts.append(host) return hosts def _parse_services(output: str) -> List[Dict[str, str]]: """Parse services command output.""" services = [] lines = output.split('\n') # Find header line header_idx = -1 for i, line in enumerate(lines): if 'port' in line.lower() and 'proto' in line.lower(): header_idx = i break if header_idx == -1: return services # Parse data lines for line in lines[header_idx + 2:]: # Skip header and separator line = line.strip() if line and not line.startswith('='): parts = line.split(None, 5) if len(parts) >= 4: service = { "host": parts[0], "port": parts[1], "proto": parts[2], "name": parts[3], "state": parts[4] if len(parts) > 4 else "", "info": parts[5] if len(parts) > 5 else "" } services.append(service) return services def _parse_vulns(output: str) -> List[Dict[str, str]]: """Parse vulnerabilities command output.""" vulns = [] lines = output.split('\n') # Find header line header_idx = -1 for i, line in enumerate(lines): if 'host' in line.lower() and 'name' in line.lower(): header_idx = i break if header_idx == -1: return vulns # Parse data lines for line in lines[header_idx + 2:]: # Skip header and separator line = line.strip() if line and not line.startswith('='): parts = line.split(None, 3) if len(parts) >= 3: vuln = { "host": parts[0], "name": parts[1], "refs": parts[2], "info": parts[3] if len(parts) > 3 else "" } vulns.append(vuln) return vulns def _parse_sessions(output: str) -> List[Dict[str, str]]: """Parse sessions command output.""" sessions = [] lines = output.split('\n') # Find header line header_idx = -1 for i, line in enumerate(lines): if 'id' in line.lower() and 'type' in line.lower(): header_idx = i break if header_idx == -1: return sessions # Parse data lines for line in lines[header_idx + 2:]: # Skip header and separator line = line.strip() if line and not line.startswith('-'): parts = line.split(None, 4) if len(parts) >= 3: session = { "id": parts[0], "name": parts[1], "type": parts[2], "information": parts[3] if len(parts) > 3 else "", "connection": parts[4] if len(parts) > 4 else "" } sessions.append(session) return sessions # Startup and cleanup handlers will be handled in main() if __name__ == "__main__": logger.info(f"Enhanced Metasploit MCP Server v{VERSION} starting...") try: mcp.run(transport="stdio") except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {e}") sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LYFTIUM-INC/msfconsole-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server