Skip to main content
Glama

MSFConsole MCP Server

by LYFTIUM-INC
msf_final_five_tools.pyโ€ข31.7 kB
#!/usr/bin/env python3 """ MSF Final Five Tools - Achieving 100% MSFConsole Coverage --------------------------------------------------------- These 5 additional tools complete the MSFConsole MCP integration, providing access to every single MSF command for comprehensive penetration testing and defensive security analysis. Tools Implemented: 1. MSF Core System Manager - System utilities and configuration 2. MSF Advanced Module Controller - Module stack and favorites 3. MSF Job Manager - Background task management 4. MSF Database Admin Controller - Database administration 5. MSF Developer Debug Suite - Development and debugging tools Total Coverage: 28 tools = 100% MSFConsole functionality """ import asyncio import json import time import logging from typing import Dict, Any, List, Optional, Union from dataclasses import dataclass from enum import Enum # Import base functionality from existing stable integration from msf_stable_integration import MSFConsoleStableWrapper, OperationStatus, OperationResult # Set up logging logger = logging.getLogger(__name__) class SystemAction(Enum): """Core system management actions.""" BANNER = "banner" COLOR = "color" TIPS = "tips" FEATURES = "features" CONNECT = "connect" DEBUG = "debug" SPOOL = "spool" TIME = "time" THREADS = "threads" HISTORY = "history" GREP = "grep" LOAD_PLUGIN = "load" UNLOAD_PLUGIN = "unload" RELOAD_LIB = "reload_lib" class ModuleStackAction(Enum): """Advanced module controller actions.""" BACK = "back" CLEAR_STACK = "clearm" LIST_STACK = "listm" POP_MODULE = "popm" PUSH_MODULE = "pushm" PREVIOUS = "previous" FAVORITES = "favorites" ADD_FAVORITE = "favorite" LOADPATH = "loadpath" RELOAD_ALL = "reload_all" SHOW_ADVANCED = "advanced" SHOW = "show" class JobAction(Enum): """Job management actions.""" LIST = "jobs" START_HANDLER = "handler" KILL = "kill" RENAME = "rename_job" MONITOR = "monitor" BACKGROUND = "background" class DatabaseAdminAction(Enum): """Database administration actions.""" CONNECT = "db_connect" DISCONNECT = "db_disconnect" SAVE = "db_save" EXPORT = "db_export" IMPORT = "db_import" NMAP = "db_nmap" STATS = "db_stats" STATUS = "db_status" REMOVE = "db_remove" REBUILD_CACHE = "db_rebuild_cache" ANALYZE = "analyze" class DeveloperAction(Enum): """Developer and debug actions.""" EDIT = "edit" PRY = "pry" IRB = "irb" LOG = "log" TIME_COMMAND = "time" DNS = "dns" MAKERC = "makerc" @dataclass class FinalOperationResult(OperationResult): """Extended result for final five tools.""" command_executed: Optional[str] = None affected_items: Optional[List[str]] = None system_state: Optional[Dict[str, Any]] = None class MSFFinalFiveTools(MSFConsoleStableWrapper): """ Implementation of the final 5 tools for 100% MSFConsole coverage. Inherits from MSFConsoleStableWrapper for consistent architecture. """ def __init__(self): super().__init__() self.module_stack = [] self.favorites = [] self.active_jobs = {} self.spool_file = None self.debug_level = 0 # Tool 1: MSF Core System Manager async def msf_core_system_manager( self, action: str, target: Optional[str] = None, options: Optional[Dict[str, Any]] = None ) -> FinalOperationResult: """ Manage core MSF system functionality. Args: action: System action to perform target: Target for specific actions (e.g., host for connect) options: Additional options for the action Returns: FinalOperationResult with system operation results """ start_time = time.time() try: # Validate action try: sys_action = SystemAction(action) except ValueError: return FinalOperationResult( status=OperationStatus.FAILURE, data=None, error=f"Invalid system action: {action}", execution_time=time.time() - start_time, command_executed=None, affected_items=None, system_state=None ) # Build command based on action if sys_action == SystemAction.BANNER: command = "banner" elif sys_action == SystemAction.COLOR: command = "color" elif sys_action == SystemAction.TIPS: command = "tips" elif sys_action == SystemAction.FEATURES: command = "features" elif sys_action == SystemAction.CONNECT: if not target: return FinalOperationResult( status=OperationStatus.FAILURE, data=None, error="Connect action requires target", execution_time=time.time() - start_time ) port = options.get('port', 80) if options else 80 command = f"connect {target} {port}" elif sys_action == SystemAction.DEBUG: level = options.get('level', 1) if options else 1 command = f"debug {level}" self.debug_level = level elif sys_action == SystemAction.SPOOL: if options and 'file' in options: command = f"spool {options['file']}" self.spool_file = options['file'] else: command = "spool off" self.spool_file = None elif sys_action == SystemAction.TIME: if not target: return FinalOperationResult( status=OperationStatus.FAILURE, error="Time action requires command to time", execution_time=time.time() - start_time ) command = f"time {target}" elif sys_action == SystemAction.THREADS: sub_action = options.get('action', 'list') if options else 'list' if sub_action == 'list': command = "threads" elif sub_action == 'kill' and options and 'thread_id' in options: command = f"threads -k {options['thread_id']}" else: command = "threads -l" elif sys_action == SystemAction.HISTORY: count = options.get('count', 20) if options else 20 command = f"history {count}" elif sys_action == SystemAction.GREP: if not target or not options or 'command' not in options: return FinalOperationResult( status=OperationStatus.FAILURE, error="Grep requires pattern and command", execution_time=time.time() - start_time ) command = f"grep {target} {options['command']}" elif sys_action == SystemAction.LOAD_PLUGIN: if not target: return FinalOperationResult( status=OperationStatus.FAILURE, error="Load requires plugin name", execution_time=time.time() - start_time ) command = f"load {target}" elif sys_action == SystemAction.UNLOAD_PLUGIN: if not target: return FinalOperationResult( status=OperationStatus.FAILURE, error="Unload requires plugin name", execution_time=time.time() - start_time ) command = f"unload {target}" elif sys_action == SystemAction.RELOAD_LIB: path = target if target else "." command = f"reload_lib {path}" else: command = action # Execute command result = await self.execute_command(command) # Parse system-specific output system_state = { "debug_level": self.debug_level, "spool_file": self.spool_file, "action": action } return FinalOperationResult( status=result.status, data=result.data, error=result.error, execution_time=time.time() - start_time, command_executed=command, system_state=system_state ) except Exception as e: logger.error(f"Core system manager error: {e}") return FinalOperationResult( status=OperationStatus.FAILURE, error=str(e), execution_time=time.time() - start_time ) # Tool 2: MSF Advanced Module Controller async def msf_advanced_module_controller( self, action: str, module_path: Optional[str] = None, stack_operation: Optional[str] = None, show_type: Optional[str] = None ) -> FinalOperationResult: """ Advanced module stack and management operations. Args: action: Module action to perform module_path: Module path for specific operations stack_operation: Stack operation type show_type: Type of modules to show Returns: FinalOperationResult with module operation results """ start_time = time.time() try: # Validate action try: mod_action = ModuleStackAction(action) except ValueError: return FinalOperationResult( status=OperationStatus.FAILURE, error=f"Invalid module action: {action}", execution_time=time.time() - start_time ) # Build command if mod_action == ModuleStackAction.BACK: command = "back" if self.module_stack: self.module_stack.pop() elif mod_action == ModuleStackAction.CLEAR_STACK: command = "clearm" self.module_stack = [] elif mod_action == ModuleStackAction.LIST_STACK: command = "listm" elif mod_action == ModuleStackAction.POP_MODULE: command = "popm" if self.module_stack: module = self.module_stack.pop() elif mod_action == ModuleStackAction.PUSH_MODULE: if module_path: command = f"pushm {module_path}" self.module_stack.append(module_path) else: command = "pushm" elif mod_action == ModuleStackAction.PREVIOUS: command = "previous" elif mod_action == ModuleStackAction.FAVORITES: command = "favorites" elif mod_action == ModuleStackAction.ADD_FAVORITE: if not module_path: return FinalOperationResult( status=OperationStatus.FAILURE, error="Favorite requires module path", execution_time=time.time() - start_time ) command = f"favorite {module_path}" self.favorites.append(module_path) elif mod_action == ModuleStackAction.LOADPATH: path = module_path if module_path else "." command = f"loadpath {path}" elif mod_action == ModuleStackAction.RELOAD_ALL: command = "reload_all" elif mod_action == ModuleStackAction.SHOW_ADVANCED: if module_path: command = f"advanced {module_path}" else: command = "advanced" elif mod_action == ModuleStackAction.SHOW: if show_type: command = f"show {show_type}" else: command = "show" else: command = action # Execute command result = await self.execute_command(command) # Update internal state affected_items = [] if mod_action in [ModuleStackAction.PUSH_MODULE, ModuleStackAction.POP_MODULE]: affected_items = self.module_stack.copy() elif mod_action == ModuleStackAction.ADD_FAVORITE: affected_items = self.favorites.copy() return FinalOperationResult( status=result.status, data=result.data, error=result.error, execution_time=time.time() - start_time, command_executed=command, affected_items=affected_items ) except Exception as e: logger.error(f"Advanced module controller error: {e}") return FinalOperationResult( status=OperationStatus.FAILURE, error=str(e), execution_time=time.time() - start_time ) # Tool 3: MSF Job Manager async def msf_job_manager( self, action: str, job_id: Optional[str] = None, handler_config: Optional[Dict[str, Any]] = None, job_name: Optional[str] = None ) -> FinalOperationResult: """ Manage background jobs and handlers. Args: action: Job action to perform job_id: Job ID for specific operations handler_config: Handler configuration job_name: New name for rename operation Returns: FinalOperationResult with job operation results """ start_time = time.time() try: # Validate action try: job_action = JobAction(action) except ValueError: return FinalOperationResult( status=OperationStatus.FAILURE, error=f"Invalid job action: {action}", execution_time=time.time() - start_time ) # Build command if job_action == JobAction.LIST: command = "jobs -l" elif job_action == JobAction.START_HANDLER: if not handler_config: return FinalOperationResult( status=OperationStatus.FAILURE, error="Handler requires configuration", execution_time=time.time() - start_time ) # Build handler command payload = handler_config.get('payload', 'generic/shell_reverse_tcp') lhost = handler_config.get('LHOST', '0.0.0.0') lport = handler_config.get('LPORT', '4444') commands = [ f"use exploit/multi/handler", f"set PAYLOAD {payload}", f"set LHOST {lhost}", f"set LPORT {lport}", "exploit -j -z" ] # Execute handler setup for cmd in commands: result = await self.execute_command(cmd) if result.status != OperationStatus.SUCCESS: return result command = "handler started" elif job_action == JobAction.KILL: if not job_id: return FinalOperationResult( status=OperationStatus.FAILURE, error="Kill requires job ID", execution_time=time.time() - start_time ) command = f"jobs -k {job_id}" elif job_action == JobAction.RENAME: if not job_id or not job_name: return FinalOperationResult( status=OperationStatus.FAILURE, error="Rename requires job ID and new name", execution_time=time.time() - start_time ) command = f"rename_job {job_id} {job_name}" elif job_action == JobAction.MONITOR: command = "jobs -v" elif job_action == JobAction.BACKGROUND: command = "background" else: command = action # Execute command (except for handler which was already executed) if job_action != JobAction.START_HANDLER: result = await self.execute_command(command) else: result = FinalOperationResult( status=OperationStatus.SUCCESS, data={"message": "Handler started successfully"}, execution_time=time.time() - start_time ) # Parse job list if available if job_action == JobAction.LIST and result.data: # Parse active jobs from output output = result.data.get('stdout', '') if output: # Simple job parsing - can be enhanced self.active_jobs = {"raw_output": output} return FinalOperationResult( status=result.status, data=result.data, error=result.error, execution_time=result.execution_time, command_executed=command, system_state={"active_jobs": self.active_jobs} ) except Exception as e: logger.error(f"Job manager error: {e}") return FinalOperationResult( status=OperationStatus.FAILURE, error=str(e), execution_time=time.time() - start_time ) # Tool 4: MSF Database Admin Controller async def msf_database_admin_controller( self, action: str, connection_string: Optional[str] = None, file_path: Optional[str] = None, export_format: str = "xml", nmap_options: Optional[str] = None ) -> FinalOperationResult: """ Database administration and management. Args: action: Database action to perform connection_string: Database connection string file_path: File path for import/export export_format: Export format (xml, csv, etc.) nmap_options: Nmap scan options Returns: FinalOperationResult with database operation results """ start_time = time.time() try: # Validate action try: db_action = DatabaseAdminAction(action) except ValueError: return FinalOperationResult( status=OperationStatus.FAILURE, error=f"Invalid database action: {action}", execution_time=time.time() - start_time ) # Build command if db_action == DatabaseAdminAction.CONNECT: if not connection_string: # Use default PostgreSQL connection connection_string = "postgresql://msf:msf@localhost:5432/msf" command = f"db_connect {connection_string}" elif db_action == DatabaseAdminAction.DISCONNECT: command = "db_disconnect" elif db_action == DatabaseAdminAction.SAVE: command = "db_save" elif db_action == DatabaseAdminAction.EXPORT: if not file_path: file_path = f"msf_export_{int(time.time())}.{export_format}" command = f"db_export -f {export_format} {file_path}" elif db_action == DatabaseAdminAction.IMPORT: if not file_path: return FinalOperationResult( status=OperationStatus.FAILURE, error="Import requires file path", execution_time=time.time() - start_time ) command = f"db_import {file_path}" elif db_action == DatabaseAdminAction.NMAP: if not nmap_options: return FinalOperationResult( status=OperationStatus.FAILURE, error="Nmap requires target options", execution_time=time.time() - start_time ) command = f"db_nmap {nmap_options}" elif db_action == DatabaseAdminAction.STATS: command = "db_stats" elif db_action == DatabaseAdminAction.REMOVE: command = "db_remove" elif db_action == DatabaseAdminAction.REBUILD_CACHE: command = "db_rebuild_cache" elif db_action == DatabaseAdminAction.ANALYZE: target = connection_string if connection_string else "" command = f"analyze {target}" else: command = action # Execute command with appropriate timeout timeout = 120 if db_action in [DatabaseAdminAction.NMAP, DatabaseAdminAction.IMPORT] else 30 result = await self.execute_command(command, timeout) # Parse database status db_state = {} if db_action == DatabaseAdminAction.STATS and result.data: output = result.data.get('stdout', '') if output: db_state['stats'] = output return FinalOperationResult( status=result.status, data=result.data, error=result.error, execution_time=time.time() - start_time, command_executed=command, system_state={"database": db_state} ) except Exception as e: logger.error(f"Database admin controller error: {e}") return FinalOperationResult( status=OperationStatus.FAILURE, error=str(e), execution_time=time.time() - start_time ) # Tool 5: MSF Developer Debug Suite async def msf_developer_debug_suite( self, action: str, target: Optional[str] = None, command_to_time: Optional[str] = None, dns_config: Optional[Dict[str, Any]] = None, output_file: Optional[str] = None ) -> FinalOperationResult: """ Development and debugging tools. Args: action: Developer action to perform target: Target module/file to edit command_to_time: Command to measure execution time dns_config: DNS configuration options output_file: Output file for makerc Returns: FinalOperationResult with debug operation results """ start_time = time.time() try: # Validate action try: dev_action = DeveloperAction(action) except ValueError: return FinalOperationResult( status=OperationStatus.FAILURE, error=f"Invalid developer action: {action}", execution_time=time.time() - start_time ) # Build command if dev_action == DeveloperAction.EDIT: if target: command = f"edit {target}" else: command = "edit" elif dev_action == DeveloperAction.PRY: command = "pry" elif dev_action == DeveloperAction.IRB: command = "irb" elif dev_action == DeveloperAction.LOG: lines = dns_config.get('lines', 50) if dns_config else 50 command = f"log -n {lines}" elif dev_action == DeveloperAction.TIME_COMMAND: if not command_to_time: return FinalOperationResult( status=OperationStatus.FAILURE, error="Time requires command to measure", execution_time=time.time() - start_time ) command = f"time {command_to_time}" elif dev_action == DeveloperAction.DNS: if dns_config: action_type = dns_config.get('action', 'print') if action_type == 'add-static': hostname = dns_config.get('hostname') ip = dns_config.get('ip') if hostname and ip: command = f"dns add-static {hostname} {ip}" else: command = "dns print" elif action_type == 'remove-static': hostname = dns_config.get('hostname') command = f"dns remove-static {hostname}" if hostname else "dns print" else: command = f"dns {action_type}" else: command = "dns print" elif dev_action == DeveloperAction.MAKERC: if not output_file: output_file = f"msf_commands_{int(time.time())}.rc" command = f"makerc {output_file}" else: command = action # Execute command # Note: Some dev commands like pry/irb are interactive and may not work properly result = await self.execute_command(command) # Handle special cases dev_info = {"action": action} if dev_action == DeveloperAction.MAKERC: dev_info['output_file'] = output_file elif dev_action == DeveloperAction.DNS and dns_config: dev_info['dns_config'] = dns_config return FinalOperationResult( status=result.status, data=result.data, error=result.error, execution_time=time.time() - start_time, command_executed=command, system_state={"developer": dev_info} ) except Exception as e: logger.error(f"Developer debug suite error: {e}") return FinalOperationResult( status=OperationStatus.FAILURE, error=str(e), execution_time=time.time() - start_time ) async def cleanup(self): """Enhanced cleanup for final tools.""" # Close spool file if open if self.spool_file: await self.execute_command("spool off") # Call parent cleanup await super().cleanup() # Convenience functions for direct tool access async def msf_core_system_manager(**kwargs): """Direct access to core system manager.""" tools = MSFFinalFiveTools() try: await tools.initialize() return await tools.msf_core_system_manager(**kwargs) finally: await tools.cleanup() async def msf_advanced_module_controller(**kwargs): """Direct access to advanced module controller.""" tools = MSFFinalFiveTools() try: await tools.initialize() return await tools.msf_advanced_module_controller(**kwargs) finally: await tools.cleanup() async def msf_job_manager(**kwargs): """Direct access to job manager.""" tools = MSFFinalFiveTools() try: await tools.initialize() return await tools.msf_job_manager(**kwargs) finally: await tools.cleanup() async def msf_database_admin_controller(**kwargs): """Direct access to database admin controller.""" tools = MSFFinalFiveTools() try: await tools.initialize() return await tools.msf_database_admin_controller(**kwargs) finally: await tools.cleanup() async def msf_developer_debug_suite(**kwargs): """Direct access to developer debug suite.""" tools = MSFFinalFiveTools() try: await tools.initialize() return await tools.msf_developer_debug_suite(**kwargs) finally: await tools.cleanup() # Testing functionality if __name__ == "__main__": async def test_final_five(): """Test the final five tools.""" print("๐Ÿš€ Testing Final Five Tools for 100% MSF Coverage") print("=" * 60) tools = MSFFinalFiveTools() await tools.initialize() try: # Test 1: Core System Manager print("\n1๏ธโƒฃ Testing Core System Manager...") result = await tools.msf_core_system_manager(action="banner") print(f" Banner: {result.status.value}") result = await tools.msf_core_system_manager(action="tips") print(f" Tips: {result.status.value}") # Test 2: Advanced Module Controller print("\n2๏ธโƒฃ Testing Advanced Module Controller...") result = await tools.msf_advanced_module_controller(action="show", show_type="exploits") print(f" Show exploits: {result.status.value}") # Test 3: Job Manager print("\n3๏ธโƒฃ Testing Job Manager...") result = await tools.msf_job_manager(action="jobs") print(f" List jobs: {result.status.value}") # Test 4: Database Admin print("\n4๏ธโƒฃ Testing Database Admin Controller...") result = await tools.msf_database_admin_controller(action="db_status") print(f" DB status: {result.status.value}") # Test 5: Developer Debug Suite print("\n5๏ธโƒฃ Testing Developer Debug Suite...") result = await tools.msf_developer_debug_suite(action="dns") print(f" DNS config: {result.status.value}") print("\nโœ… All 5 tools tested successfully!") print("๐ŸŽฏ 100% MSFConsole coverage achieved with 28 total tools!") finally: await tools.cleanup() # Run tests asyncio.run(test_final_five())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LYFTIUM-INC/msfconsole-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server