Skip to main content
Glama

MSFConsole MCP Server

msf_extended_tools.py116 kB
#!/usr/bin/env python3 """ MSF Console MCP Extended Tools Implementation ============================================ Implements 15 additional MCP tools to achieve 95% MSFConsole coverage. Built on top of the existing MSFConsoleStableWrapper foundation. """ import asyncio import json import logging import time from typing import Dict, Any, List, Optional, Union from dataclasses import dataclass, field from enum import Enum import re # Import existing stable wrapper from msf_stable_integration import ( MSFConsoleStableWrapper, OperationStatus, OperationResult ) # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("msf_extended_tools") # Extended result for additional metadata @dataclass class ExtendedOperationResult(OperationResult): """Extended result with additional metadata""" metadata: Dict[str, Any] = field(default_factory=dict) suggestions: List[str] = field(default_factory=list) pagination: Dict[str, Any] = field(default_factory=dict) class ModuleAction(Enum): """Module manager actions""" USE = "use" INFO = "info" OPTIONS = "options" SET = "set" UNSET = "unset" CHECK = "check" RUN = "run" EXPLOIT = "exploit" BACK = "back" RELOAD = "reload_all" class SessionAction(Enum): """Session interaction actions""" LIST = "list" INTERACT = "interact" EXECUTE = "execute" UPGRADE = "upgrade" KILL = "kill" BACKGROUND = "background" DETACH = "detach" class DatabaseAction(Enum): """Database query actions""" LIST = "list" ADD = "add" UPDATE = "update" DELETE = "delete" SEARCH = "search" EXPORT = "export" class MSFExtendedTools(MSFConsoleStableWrapper): """Extended MSF tools implementation""" def __init__(self): super().__init__() self.module_context = None # Current module context self.session_context = {} # Active session contexts self.automated_workflows = {} # Automation workflows # ==================== TOOL 1: Module Manager ==================== async def msf_module_manager( self, action: str, module_path: str = None, options: Dict[str, str] = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Comprehensive module management tool. Actions: use, info, options, set, unset, check, run, exploit, back, reload """ start_time = time.time() try: # Validate action try: module_action = ModuleAction(action.lower()) except ValueError: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid action: {action}. Valid actions: {[a.value for a in ModuleAction]}" ) # Execute action if module_action == ModuleAction.USE: if not module_path: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Module path required for 'use' action" ) result = await self.execute_command(f"use {module_path}", timeout) if result.status == OperationStatus.SUCCESS: self.module_context = module_path return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"module": module_path, "loaded": True}, execution_time=result.execution_time, metadata={"context": self.module_context} ) elif module_action == ModuleAction.INFO: cmd = f"info {module_path}" if module_path else "info" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: # Parse module info info = self._parse_module_info(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data=info, execution_time=result.execution_time ) elif module_action == ModuleAction.OPTIONS: result = await self.execute_command("show options", timeout) if result.status == OperationStatus.SUCCESS: # Parse options options_data = self._parse_options(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data=options_data, execution_time=result.execution_time ) elif module_action == ModuleAction.SET: if not options: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Options required for 'set' action" ) # Set multiple options success_count = 0 errors = [] for key, value in options.items(): result = await self.execute_command(f"set {key} {value}", timeout) if result.status == OperationStatus.SUCCESS: success_count += 1 else: errors.append(f"{key}: {result.error}") return ExtendedOperationResult( status=OperationStatus.SUCCESS if not errors else OperationStatus.PARTIAL, data={"set_count": success_count, "errors": errors}, execution_time=time.time() - start_time ) elif module_action in [ModuleAction.RUN, ModuleAction.EXPLOIT]: # Check if module is loaded if not self.module_context: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="No module loaded. Use 'use' action first" ) # Use longer timeout for exploitation exploit_timeout = timeout or 120.0 result = await self.execute_command(action, exploit_timeout) if result.status == OperationStatus.SUCCESS: # Check for session creation session_info = self._extract_session_info(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"executed": True, "session": session_info}, execution_time=result.execution_time, metadata={"module": self.module_context} ) elif module_action == ModuleAction.CHECK: result = await self.execute_command("check", timeout) if result.status == OperationStatus.SUCCESS: # Parse check result check_result = self._parse_check_result(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data=check_result, execution_time=result.execution_time ) elif module_action == ModuleAction.BACK: result = await self.execute_command("back", timeout) if result.status == OperationStatus.SUCCESS: self.module_context = None return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"context": "msf"}, execution_time=result.execution_time ) elif module_action == ModuleAction.RELOAD: result = await self.execute_command("reload_all", timeout or 60.0) return ExtendedOperationResult( status=result.status, data={"reloaded": result.status == OperationStatus.SUCCESS}, execution_time=result.execution_time ) # Fallback for unhandled actions return ExtendedOperationResult( status=result.status, data=result.data, execution_time=result.execution_time, error=result.error ) except Exception as e: logger.error(f"Module manager error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 2: Session Interact ==================== async def msf_session_interact( self, session_id: int = None, action: str = "list", command: str = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Advanced session interaction tool. Actions: list, interact, execute, upgrade, kill, background, detach """ start_time = time.time() try: # Validate action try: session_action = SessionAction(action.lower()) except ValueError: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid action: {action}. Valid actions: {[a.value for a in SessionAction]}" ) if session_action == SessionAction.LIST: result = await self.execute_command("sessions -l", timeout) if result.status == OperationStatus.SUCCESS: sessions = self._parse_sessions(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"sessions": sessions, "count": len(sessions)}, execution_time=result.execution_time ) elif session_action == SessionAction.INTERACT: if session_id is None: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Session ID required for interact action" ) # Note: Interactive sessions require special handling # For now, we'll return session info result = await self.execute_command(f"sessions -i {session_id}", timeout) if result.status == OperationStatus.SUCCESS: self.session_context[session_id] = True return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"session_id": session_id, "interactive": True}, execution_time=result.execution_time, metadata={"note": "Use execute action to run commands"} ) elif session_action == SessionAction.EXECUTE: if session_id is None or command is None: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Session ID and command required for execute action" ) # Execute command in session cmd = f"sessions -C '{command}' -i {session_id}" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "session_id": session_id, "command": command, "output": result.data.get("stdout", "") }, execution_time=result.execution_time ) elif session_action == SessionAction.UPGRADE: if session_id is None: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Session ID required for upgrade action" ) # Upgrade shell to meterpreter result = await self.execute_command(f"sessions -u {session_id}", timeout or 60.0) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"session_id": session_id, "upgraded": True}, execution_time=result.execution_time ) elif session_action == SessionAction.KILL: if session_id is None: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Session ID required for kill action" ) result = await self.execute_command(f"sessions -k {session_id}", timeout) if result.status == OperationStatus.SUCCESS: if session_id in self.session_context: del self.session_context[session_id] return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"session_id": session_id, "killed": True}, execution_time=result.execution_time ) # Return result for other actions return ExtendedOperationResult( status=result.status, data=result.data, execution_time=result.execution_time, error=result.error ) except Exception as e: logger.error(f"Session interact error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 3: Database Query ==================== async def msf_database_query( self, table: str, action: str = "list", filters: Dict[str, Any] = None, data: Dict[str, Any] = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Database operations for hosts, services, vulns, creds, loot. Actions: list, add, update, delete, search, export """ start_time = time.time() try: # Validate action try: db_action = DatabaseAction(action.lower()) except ValueError: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid action: {action}. Valid actions: {[a.value for a in DatabaseAction]}" ) # Validate table valid_tables = ["hosts", "services", "vulns", "creds", "loot", "notes"] if table not in valid_tables: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid table: {table}. Valid tables: {valid_tables}" ) if db_action == DatabaseAction.LIST: # Build command with filters cmd = table if filters: filter_args = self._build_filter_args(filters) cmd += f" {filter_args}" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: parsed_data = self._parse_database_output(table, result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={table: parsed_data, "count": len(parsed_data)}, execution_time=result.execution_time ) elif db_action == DatabaseAction.ADD: if not data: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Data required for add action" ) # Build add command based on table cmd = self._build_add_command(table, data) result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"added": True, "table": table}, execution_time=result.execution_time ) elif db_action == DatabaseAction.DELETE: if not filters: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Filters required for delete action (safety)" ) # Build delete command filter_args = self._build_filter_args(filters) cmd = f"{table} -d {filter_args}" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"deleted": True, "table": table, "filters": filters}, execution_time=result.execution_time ) elif db_action == DatabaseAction.SEARCH: if not filters: filters = {} # Use search-specific parsing search_term = filters.get("search", "") cmd = f"{table} -S {search_term}" if search_term else table result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: parsed_data = self._parse_database_output(table, result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"results": parsed_data, "count": len(parsed_data)}, execution_time=result.execution_time ) elif db_action == DatabaseAction.EXPORT: # Export database export_format = filters.get("format", "xml") if filters else "xml" cmd = f"db_export -f {export_format}" result = await self.execute_command(cmd, timeout or 60.0) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"exported": True, "format": export_format}, execution_time=result.execution_time ) # Return result return ExtendedOperationResult( status=result.status, data=result.data, execution_time=result.execution_time, error=result.error ) except Exception as e: logger.error(f"Database query error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 4: Exploit Chain ==================== async def msf_exploit_chain( self, target: str, exploit_module: str, payload: str, options: Dict[str, str], auto_execute: bool = False, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Complete exploitation workflow automation. Combines: search → use → set options → check → exploit → session management """ start_time = time.time() workflow_steps = [] try: # Step 1: Load exploit module use_result = await self.msf_module_manager("use", exploit_module) workflow_steps.append({ "step": "load_module", "status": use_result.status.value, "time": use_result.execution_time }) if use_result.status != OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.FAILURE, data={"workflow": workflow_steps}, execution_time=time.time() - start_time, error=f"Failed to load module: {use_result.error}" ) # Step 2: Set payload payload_result = await self.execute_command(f"set payload {payload}") workflow_steps.append({ "step": "set_payload", "status": payload_result.status.value, "time": payload_result.execution_time }) # Step 3: Set options options["RHOSTS"] = target # Ensure target is set set_result = await self.msf_module_manager("set", options=options) workflow_steps.append({ "step": "set_options", "status": set_result.status.value, "time": set_result.execution_time }) # Step 4: Check if vulnerable (optional) check_result = await self.msf_module_manager("check") workflow_steps.append({ "step": "check_vulnerable", "status": check_result.status.value, "result": check_result.data, "time": check_result.execution_time }) # Step 5: Execute exploit if auto_execute or check passed if auto_execute or (check_result.data and check_result.data.get("vulnerable")): exploit_result = await self.msf_module_manager("exploit", timeout=timeout or 120.0) workflow_steps.append({ "step": "exploit", "status": exploit_result.status.value, "session": exploit_result.data.get("session") if exploit_result.data else None, "time": exploit_result.execution_time }) # Step 6: Check for new sessions if exploit_result.status == OperationStatus.SUCCESS: sessions_result = await self.msf_session_interact(action="list") workflow_steps.append({ "step": "session_check", "status": sessions_result.status.value, "sessions": sessions_result.data.get("count", 0) if sessions_result.data else 0, "time": sessions_result.execution_time }) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "workflow": workflow_steps, "success": True, "sessions": sessions_result.data }, execution_time=time.time() - start_time, metadata={ "exploit": exploit_module, "target": target, "payload": payload } ) else: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "workflow": workflow_steps, "success": False, "reason": "Target not vulnerable or auto_execute disabled" }, execution_time=time.time() - start_time ) # Return partial success if we got this far return ExtendedOperationResult( status=OperationStatus.PARTIAL, data={"workflow": workflow_steps}, execution_time=time.time() - start_time ) except Exception as e: logger.error(f"Exploit chain error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data={"workflow": workflow_steps}, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 5: Post Exploitation ==================== async def msf_post_exploitation( self, session_id: int, module: str, options: Dict[str, str] = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Post-exploitation module execution. Modules: gather/*, escalate/*, manage/* """ start_time = time.time() try: # Load post module use_result = await self.msf_module_manager("use", module) if use_result.status != OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Failed to load post module: {use_result.error}" ) # Set session session_result = await self.execute_command(f"set SESSION {session_id}") if session_result.status != OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Failed to set session" ) # Set additional options if options: set_result = await self.msf_module_manager("set", options=options) if set_result.status != OperationStatus.SUCCESS: logger.warning(f"Some options failed to set: {set_result.data}") # Run post module run_result = await self.msf_module_manager("run", timeout=timeout) if run_result.status == OperationStatus.SUCCESS: # Parse post module output output = run_result.data.get("stdout", "") if run_result.data else "" post_data = self._parse_post_module_output(module, output) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "module": module, "session_id": session_id, "results": post_data }, execution_time=time.time() - start_time ) return ExtendedOperationResult( status=run_result.status, data=run_result.data, execution_time=time.time() - start_time, error=run_result.error ) except Exception as e: logger.error(f"Post exploitation error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 6: Handler Manager ==================== async def msf_handler_manager( self, action: str, handler_name: str, payload_type: str = None, options: Dict[str, str] = None, auto_options: Dict = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Payload handler management. Actions: start, stop, list, multi_handler """ start_time = time.time() try: if action == "start" or action == "create": if not payload_type or not options: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Payload_type and options required for start/create action" ) # Use multi/handler use_result = await self.msf_module_manager("use", "exploit/multi/handler") if use_result.status == OperationStatus.SUCCESS: # Set payload await self.execute_command(f"set payload {payload_type}") # Set options await self.msf_module_manager("set", options=options) # Start handler as job handler_result = await self.execute_command("exploit -j", timeout) if handler_result.status == OperationStatus.SUCCESS: # Extract job ID job_id = self._extract_job_id(handler_result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "handler_started": True, "job_id": job_id, "payload": payload_type }, execution_time=time.time() - start_time ) elif action == "list": # List jobs result = await self.execute_command("jobs", timeout) if result.status == OperationStatus.SUCCESS: jobs = self._parse_jobs(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"jobs": jobs, "count": len(jobs)}, execution_time=result.execution_time ) elif action == "stop": job_id = options.get("job_id") if options else None if not job_id: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="job_id required in options for stop action" ) result = await self.execute_command(f"jobs -k {job_id}", timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"stopped": True, "job_id": job_id}, execution_time=result.execution_time ) elif action == "multi_handler": # Set up persistent multi-handler use_result = await self.msf_module_manager("use", "exploit/multi/handler") if use_result.status == OperationStatus.SUCCESS: # Set ExitOnSession false for persistent handler await self.execute_command("set ExitOnSession false") # Set payload if provided if payload: await self.execute_command(f"set payload {payload}") # Set options if provided if options: await self.msf_module_manager("set", options=options) # Start persistent handler handler_result = await self.execute_command("exploit -j -z", timeout) if handler_result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "multi_handler": True, "persistent": True }, execution_time=time.time() - start_time ) # Invalid action return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid action: {action}. Valid actions: start, stop, list, multi_handler" ) except Exception as e: logger.error(f"Handler manager error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 7: Scanner Suite ==================== async def msf_scanner_suite( self, scanner_type: str, targets: Union[str, List[str]], options: Dict[str, str] = None, threads: int = 10, output_format: str = "table", timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Integrated scanning capabilities. Types: port, smb, http, ssh, ftp, snmp, discovery """ start_time = time.time() try: # Map scan types to modules scanner_modules = { "port": "auxiliary/scanner/portscan/tcp", "smb": "auxiliary/scanner/smb/smb_version", "http": "auxiliary/scanner/http/http_version", "ssh": "auxiliary/scanner/ssh/ssh_version", "ftp": "auxiliary/scanner/ftp/ftp_version", "snmp": "auxiliary/scanner/snmp/snmp_enum", "discovery": "auxiliary/scanner/discovery/arp_sweep" } # Map scanner_type categories to specific modules type_mapping = { "network": "discovery", "service": "port", "vulnerability": "smb", # Example vulnerability scanner "credential": "ssh", # Can check for weak creds "web": "http", "custom": "port" # Default to port scan } scan_type = type_mapping.get(scanner_type, scanner_type) if scan_type not in scanner_modules: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid scanner type: {scanner_type}. Valid types: {list(type_mapping.keys())}" ) # Load scanner module module = scanner_modules[scan_type] use_result = await self.msf_module_manager("use", module) if use_result.status != OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Failed to load scanner module: {use_result.error}" ) # Set targets (join list if needed) target_str = " ".join(targets) if isinstance(targets, list) else targets await self.execute_command(f"set RHOSTS {target_str}") # Set additional options if options: await self.msf_module_manager("set", options=options) # Run scan scan_timeout = timeout or 60.0 scan_result = await self.msf_module_manager("run", timeout=scan_timeout) if scan_result.status == OperationStatus.SUCCESS: # Parse scan results output = scan_result.data.get("stdout", "") if scan_result.data else "" scan_data = self._parse_scan_output(scan_type, output) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "scan_type": scan_type, "targets": targets, "results": scan_data, "hosts_found": len(scan_data) }, execution_time=time.time() - start_time ) return ExtendedOperationResult( status=scan_result.status, data=scan_result.data, execution_time=time.time() - start_time, error=scan_result.error ) except Exception as e: logger.error(f"Scanner suite error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 8: Credential Manager ==================== async def msf_credential_manager( self, action: str, cred_data: Dict[str, Any] = None, filters: Dict[str, Any] = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Credential management. Actions: add, list, crack, validate, export """ start_time = time.time() try: if action == "list": # List credentials with optional filters cmd = "creds" if filters: if "service" in filters: cmd += f" -s {filters['service']}" if "host" in filters: cmd += f" -h {filters['host']}" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: creds = self._parse_credentials(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"credentials": creds, "count": len(creds)}, execution_time=result.execution_time ) elif action == "add": if not cred_data: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Credential data required for add action" ) # Build creds add command cmd = "creds add" if "user" in cred_data: cmd += f" user:{cred_data['user']}" if "password" in cred_data: cmd += f" password:{cred_data['password']}" if "host" in cred_data: cmd += f" host:{cred_data['host']}" if "service" in cred_data: cmd += f" service:{cred_data['service']}" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"added": True, "credential": cred_data}, execution_time=result.execution_time ) elif action == "validate": # Validate credentials using appropriate module if not cred_data or "service" not in cred_data: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Service type required for credential validation" ) # Map services to validation modules validation_modules = { "ssh": "auxiliary/scanner/ssh/ssh_login", "smb": "auxiliary/scanner/smb/smb_login", "ftp": "auxiliary/scanner/ftp/ftp_login", "mysql": "auxiliary/scanner/mysql/mysql_login", "postgres": "auxiliary/scanner/postgres/postgres_login" } service = cred_data["service"].lower() if service not in validation_modules: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"No validation module for service: {service}" ) # Load validation module module = validation_modules[service] await self.msf_module_manager("use", module) # Set credentials options = { "RHOSTS": cred_data.get("host", ""), "USERNAME": cred_data.get("user", ""), "PASSWORD": cred_data.get("password", "") } await self.msf_module_manager("set", options=options) # Run validation validate_result = await self.msf_module_manager("run", timeout=timeout) if validate_result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "validated": True, "credential": cred_data, "valid": "Success" in str(validate_result.data) }, execution_time=time.time() - start_time ) elif action == "export": # Export credentials export_format = filters.get("format", "csv") if filters else "csv" result = await self.execute_command(f"creds -o /tmp/creds.{export_format}", timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"exported": True, "format": export_format, "path": f"/tmp/creds.{export_format}"}, execution_time=result.execution_time ) # Invalid action return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid action: {action}. Valid actions: list, add, validate, export" ) except Exception as e: logger.error(f"Credential manager error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 9: Pivot Manager ==================== async def msf_pivot_manager( self, action: str, session_id: Optional[str] = None, network: Optional[str] = None, options: Dict[str, Any] = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Network pivoting and routing. Actions: add_route, list_routes, portfwd, socks_proxy """ start_time = time.time() try: if action == "add_route" or action == "remove_route": if not session_id: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="session_id required for add_route/remove_route" ) # Use network parameter or get from options subnet = network or (options and options.get("subnet")) if not subnet: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Network/subnet required for route operations" ) netmask = (options and options.get("netmask")) or "255.255.255.0" if action == "add_route": cmd = f"route add {subnet} {netmask} {session_id}" else: cmd = f"route remove {subnet} {netmask} {session_id}" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "route_added": True, "subnet": subnet, "netmask": netmask, "session": session_id }, execution_time=result.execution_time ) elif action == "list_routes": result = await self.execute_command("route print", timeout) if result.status == OperationStatus.SUCCESS: routes = self._parse_routes(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"routes": routes, "count": len(routes)}, execution_time=result.execution_time ) elif action == "portfwd": if not options: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Options required for port forwarding" ) # Build portfwd command cmd = f"sessions -i {session_id} -c 'portfwd add" if "local_port" in options: cmd += f" -l {options['local_port']}" if "remote_host" in options: cmd += f" -p {options['remote_host']}" if "remote_port" in options: cmd += f" -r {options['remote_port']}" cmd += "'" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "portfwd_added": True, "session": session_id, "forwarding": options }, execution_time=result.execution_time ) elif action == "socks_proxy": # Set up SOCKS proxy use_result = await self.msf_module_manager("use", "auxiliary/server/socks_proxy") if use_result.status == OperationStatus.SUCCESS: # Set options proxy_options = { "SRVPORT": options.get("port", "1080") if options else "1080", "VERSION": options.get("version", "5") if options else "5" } await self.msf_module_manager("set", options=proxy_options) # Start SOCKS proxy proxy_result = await self.execute_command("run -j", timeout) if proxy_result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "socks_proxy": True, "port": proxy_options["SRVPORT"], "version": proxy_options["VERSION"] }, execution_time=time.time() - start_time ) # Invalid action return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid action: {action}. Valid actions: add_route, list_routes, portfwd, socks_proxy" ) except Exception as e: logger.error(f"Pivot manager error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 10: Resource Executor ==================== async def msf_resource_executor( self, script_path: str = None, commands: List[str] = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Resource script execution. Can use file path or command list. """ start_time = time.time() try: if script_path: # Execute resource file result = await self.execute_command(f"resource {script_path}", timeout or 60.0) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "executed": True, "script": script_path, "output": result.data.get("stdout", "") if result.data else "" }, execution_time=result.execution_time ) elif commands: # Debug: Check what we received logger.info(f"Resource executor received commands: {commands} (type: {type(commands)})") # Handle case where commands comes as a string representation if isinstance(commands, str): logger.info(f"Commands is string, attempting JSON parse: {commands}") # Try to parse as JSON array try: import json commands = json.loads(commands) logger.info(f"Successfully parsed to: {commands}") except Exception as e: logger.warning(f"JSON parse failed ({e}), treating as single command") # Fallback: treat as single command commands = [commands] # Execute commands sequentially results = [] total_time = 0 for cmd in commands: cmd_result = await self.execute_command(cmd, timeout) results.append({ "command": cmd, "status": cmd_result.status.value, "output": cmd_result.data.get("stdout", "") if cmd_result.data else "", "time": cmd_result.execution_time }) total_time += cmd_result.execution_time # Stop on failure if cmd_result.status == OperationStatus.FAILURE: break # Determine overall status failed = any(r["status"] == "failure" for r in results) return ExtendedOperationResult( status=OperationStatus.FAILURE if failed else OperationStatus.SUCCESS, data={ "executed": not failed, "commands": len(commands), "results": results }, execution_time=total_time ) else: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Either script_path or commands list required" ) except Exception as e: logger.error(f"Resource executor error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 11: Loot Collector ==================== async def msf_loot_collector( self, session_id: int = None, loot_type: str = None, action: str = "list", timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Data collection and management. Types: files, screenshots, keystrokes, passwords Actions: list, collect, export """ start_time = time.time() try: if action == "list": # List loot cmd = "loot" if loot_type: cmd += f" -t {loot_type}" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: loot_items = self._parse_loot(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"loot": loot_items, "count": len(loot_items)}, execution_time=result.execution_time ) elif action == "collect": if not session_id: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Session ID required for collect action" ) # Map loot types to post modules loot_modules = { "files": "post/windows/gather/enum_files", "screenshots": "post/windows/gather/screen_spy", "keystrokes": "post/windows/capture/keylog_recorder", "passwords": "post/windows/gather/credentials/credential_collector", "browser": "post/multi/gather/firefox_creds", "system": "post/windows/gather/enum_system" } if loot_type and loot_type in loot_modules: # Use specific loot module module = loot_modules[loot_type] post_result = await self.msf_post_exploitation( session_id=session_id, module=module, timeout=timeout ) if post_result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "collected": True, "type": loot_type, "session": session_id, "results": post_result.data }, execution_time=time.time() - start_time ) else: # General loot collection return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "message": "Use specific loot_type for targeted collection", "available_types": list(loot_modules.keys()) }, execution_time=time.time() - start_time ) elif action == "export": # Export loot result = await self.execute_command("loot -o /tmp/loot_export.txt", timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"exported": True, "path": "/tmp/loot_export.txt"}, execution_time=result.execution_time ) # Invalid action return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid action: {action}. Valid actions: list, collect, export" ) except Exception as e: logger.error(f"Loot collector error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 12: Vulnerability Tracker ==================== async def msf_vulnerability_tracker( self, action: str, vuln_data: Dict[str, Any] = None, filters: Dict[str, Any] = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Vulnerability management. Actions: import, analyze, correlate, report """ start_time = time.time() try: if action == "list": # List vulnerabilities cmd = "vulns" if filters: if "host" in filters: cmd += f" -h {filters['host']}" if "service" in filters: cmd += f" -s {filters['service']}" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: vulns = self._parse_vulnerabilities(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"vulnerabilities": vulns, "count": len(vulns)}, execution_time=result.execution_time ) elif action == "import": if not vuln_data or "file" not in vuln_data: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="File path required in vuln_data for import" ) # Import vulnerability scan result = await self.execute_command(f"db_import {vuln_data['file']}", timeout or 60.0) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"imported": True, "file": vuln_data['file']}, execution_time=result.execution_time ) elif action == "analyze": # Analyze vulnerabilities for exploitability vulns_result = await self.execute_command("vulns", timeout) if vulns_result.status == OperationStatus.SUCCESS: vulns = self._parse_vulnerabilities(vulns_result.data.get("stdout", "")) # Search for exploits for each vulnerability exploitable = [] for vuln in vulns[:10]: # Limit to prevent timeout if "name" in vuln: search_result = await self.search_modules(vuln["name"], limit=5) if search_result.data and search_result.data.get("modules"): exploitable.append({ "vulnerability": vuln, "exploits": search_result.data["modules"] }) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "analyzed": True, "total_vulns": len(vulns), "exploitable": len(exploitable), "results": exploitable }, execution_time=time.time() - start_time ) elif action == "correlate": # Correlate vulnerabilities with available exploits result = await self.execute_command("analyze", timeout or 60.0) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "correlated": True, "output": result.data.get("stdout", "") if result.data else "" }, execution_time=result.execution_time ) elif action == "report": # Generate vulnerability report report_data = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "vulnerabilities": [], "statistics": {} } # Get vulnerabilities vulns_result = await self.execute_command("vulns", timeout) if vulns_result.status == OperationStatus.SUCCESS: vulns = self._parse_vulnerabilities(vulns_result.data.get("stdout", "")) report_data["vulnerabilities"] = vulns # Calculate statistics report_data["statistics"] = { "total": len(vulns), "by_severity": self._count_by_severity(vulns), "by_service": self._count_by_service(vulns) } return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"report": report_data}, execution_time=time.time() - start_time ) # Invalid action return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid action: {action}. Valid actions: list, import, analyze, correlate, report" ) except Exception as e: logger.error(f"Vulnerability tracker error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 13: Reporting Engine ==================== async def msf_reporting_engine( self, report_type: str, workspace: str, filters: Dict[str, Any] = None, template: Optional[str] = None, output_format: str = "pdf", include_evidence: bool = True, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Comprehensive reporting. Types: hosts, services, vulns, exploitation_timeline, executive_summary Formats: json, xml, html, text """ start_time = time.time() try: report_data = { "type": report_type, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "data": {} } if report_type == "hosts": # Get hosts report hosts_result = await self.msf_database_query("hosts", "list", filters) if hosts_result.status == OperationStatus.SUCCESS: report_data["data"] = hosts_result.data elif report_type == "services": # Get services report services_result = await self.msf_database_query("services", "list", filters) if services_result.status == OperationStatus.SUCCESS: report_data["data"] = services_result.data elif report_type == "vulns": # Get vulnerabilities report vulns_result = await self.msf_vulnerability_tracker("list", filters=filters) if vulns_result.status == OperationStatus.SUCCESS: report_data["data"] = vulns_result.data elif report_type == "exploitation_timeline": # Get exploitation timeline sessions_result = await self.msf_session_interact(action="list") loot_result = await self.msf_loot_collector(action="list") report_data["data"] = { "sessions": sessions_result.data if sessions_result.data else {}, "loot": loot_result.data if loot_result.data else {}, "timeline": self._build_exploitation_timeline(sessions_result.data, loot_result.data) } elif report_type == "executive_summary": # Build executive summary hosts_result = await self.msf_database_query("hosts", "list") services_result = await self.msf_database_query("services", "list") vulns_result = await self.msf_vulnerability_tracker("list") sessions_result = await self.msf_session_interact(action="list") report_data["data"] = { "summary": { "total_hosts": hosts_result.data.get("count", 0) if hosts_result.data else 0, "total_services": services_result.data.get("count", 0) if services_result.data else 0, "total_vulnerabilities": vulns_result.data.get("count", 0) if vulns_result.data else 0, "active_sessions": sessions_result.data.get("count", 0) if sessions_result.data else 0 }, "risk_assessment": self._calculate_risk_score(vulns_result.data), "recommendations": self._generate_recommendations(vulns_result.data) } else: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid report type: {report_type}" ) # Format report formatted_report = self._format_report(report_data, output_format) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "report": formatted_report, "format": output_format, "type": report_type }, execution_time=time.time() - start_time ) except Exception as e: logger.error(f"Reporting engine error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 14: Automation Builder ==================== async def msf_automation_builder( self, action: str, workflow_name: str, node_config: Optional[Dict[str, Any]] = None, connections: Optional[List[Dict]] = None, execution_params: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Build and execute complex workflows. Example: recon → exploit → post-exploit → report """ start_time = time.time() try: # Initialize automated_workflows if not exists if not hasattr(self, 'automated_workflows'): self.automated_workflows = {} if action == "create_workflow": # Check if workflow already exists if workflow_name in self.automated_workflows: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Workflow '{workflow_name}' already exists" ) # Create a new workflow self.automated_workflows[workflow_name] = { "created": time.time(), "modified": time.time(), "nodes": [], "connections": [], "status": "draft" } return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "workflow_created": True, "name": workflow_name, "status": "draft" }, execution_time=time.time() - start_time ) elif action == "add_node": if workflow_name not in self.automated_workflows: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Workflow {workflow_name} not found" ) if not node_config: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="node_config required for add_node" ) node_id = len(self.automated_workflows[workflow_name]["nodes"]) node_config["id"] = node_id self.automated_workflows[workflow_name]["nodes"].append(node_config) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "node_added": True, "node_id": node_id, "workflow": workflow_name }, execution_time=time.time() - start_time ) elif action == "connect_nodes": if workflow_name not in self.automated_workflows: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Workflow {workflow_name} not found" ) if not connections: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="connections required for connect_nodes" ) # Add connections to workflow self.automated_workflows[workflow_name]["connections"].extend(connections) self.automated_workflows[workflow_name]["modified"] = time.time() return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "connections_added": len(connections), "workflow": workflow_name, "total_connections": len(self.automated_workflows[workflow_name]["connections"]) }, execution_time=time.time() - start_time ) elif action == "validate": if workflow_name not in self.automated_workflows: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Workflow {workflow_name} not found" ) workflow = self.automated_workflows[workflow_name] validation_errors = [] # Validate workflow has nodes if not workflow["nodes"]: validation_errors.append("Workflow has no nodes") # Validate connections reference valid nodes node_ids = {node["id"] for node in workflow["nodes"]} for conn in workflow.get("connections", []): if conn.get("from") not in node_ids: validation_errors.append(f"Connection references invalid source node: {conn.get('from')}") if conn.get("to") not in node_ids: validation_errors.append(f"Connection references invalid target node: {conn.get('to')}") is_valid = len(validation_errors) == 0 return ExtendedOperationResult( status=OperationStatus.SUCCESS if is_valid else OperationStatus.FAILURE, data={ "workflow": workflow_name, "valid": is_valid, "errors": validation_errors, "node_count": len(workflow["nodes"]), "connection_count": len(workflow.get("connections", [])) }, execution_time=time.time() - start_time ) elif action == "execute": # Execute workflow with proper error handling if workflow_name not in self.automated_workflows: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Workflow {workflow_name} not found" ) workflow = self.automated_workflows[workflow_name] # Check if workflow is valid if not workflow["nodes"]: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Cannot execute empty workflow" ) results = [] overall_success = True # Execute each node in sequence for i, node in enumerate(workflow["nodes"]): try: node_type = node.get('type', 'unknown') node_params = node.get('params', {}) # Map node types to actual MSF operations if node_type == "scan": # Would call scanner_suite result = { "node_id": node["id"], "type": node_type, "status": "success", "output": f"Scan completed for {node_params.get('target', 'unknown')}" } elif node_type == "exploit": # Would call exploit_chain result = { "node_id": node["id"], "type": node_type, "status": "success", "output": f"Exploit attempt on {node_params.get('target', 'unknown')}" } else: result = { "node_id": node["id"], "type": node_type, "status": "success", "output": f"Executed {node_type} operation" } results.append(result) except Exception as e: result = { "node_id": node.get("id", i), "type": node.get('type', 'unknown'), "status": "failure", "error": str(e) } results.append(result) overall_success = False # Stop on error if not configured to continue if not execution_params or not execution_params.get("continue_on_error", False): break # Update workflow status self.automated_workflows[workflow_name]["status"] = "completed" if overall_success else "failed" self.automated_workflows[workflow_name]["last_execution"] = time.time() return ExtendedOperationResult( status=OperationStatus.SUCCESS if overall_success else OperationStatus.PARTIAL, data={ "workflow_executed": True, "name": workflow_name, "results": results, "nodes_executed": len(results), "success_count": sum(1 for r in results if r["status"] == "success"), "failure_count": sum(1 for r in results if r["status"] == "failure") }, execution_time=time.time() - start_time ) elif action == "list": # List all workflows workflow_list = [] for name, workflow in self.automated_workflows.items(): workflow_list.append({ "name": name, "status": workflow.get("status", "unknown"), "created": workflow["created"], "modified": workflow.get("modified", workflow["created"]), "node_count": len(workflow["nodes"]), "connection_count": len(workflow.get("connections", [])) }) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "workflows": workflow_list, "total_count": len(workflow_list) }, execution_time=time.time() - start_time ) elif action == "export": if workflow_name not in self.automated_workflows: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Workflow {workflow_name} not found" ) workflow = self.automated_workflows[workflow_name] # Export workflow as JSON export_data = { "name": workflow_name, "created": workflow["created"], "modified": workflow.get("modified", workflow["created"]), "status": workflow.get("status", "unknown"), "nodes": workflow["nodes"], "connections": workflow.get("connections", []), "metadata": { "exported_at": time.time(), "node_count": len(workflow["nodes"]), "connection_count": len(workflow.get("connections", [])) } } return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "workflow_exported": True, "name": workflow_name, "export": json.dumps(export_data, indent=2) }, execution_time=time.time() - start_time ) else: # Default/unknown action return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Unknown action: {action}. Valid actions: create_workflow, add_node, connect_nodes, validate, execute, export, list" ) except Exception as e: logger.error(f"Automation builder error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data={"workflow": workflow_name}, execution_time=time.time() - start_time, error=str(e) ) # ==================== TOOL 15: Plugin Manager ==================== async def msf_plugin_manager( self, action: str, plugin_name: str = None, options: Dict[str, Any] = None, timeout: Optional[float] = None ) -> ExtendedOperationResult: """ Plugin management. Actions: load, unload, list, info Popular: openvas, nexpose, nessus, nmap """ start_time = time.time() try: if action == "list": # List loaded plugins result = await self.execute_command("load -l", timeout) if result.status == OperationStatus.SUCCESS: plugins = self._parse_plugins(result.data.get("stdout", "")) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"plugins": plugins, "loaded_count": len(plugins)}, execution_time=result.execution_time ) elif action == "load": if not plugin_name: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Plugin name required for load action" ) # Load plugin with options cmd = f"load {plugin_name}" if options: for key, value in options.items(): cmd += f" {key}={value}" result = await self.execute_command(cmd, timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "loaded": True, "plugin": plugin_name, "options": options }, execution_time=result.execution_time ) elif action == "unload": if not plugin_name: return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error="Plugin name required for unload action" ) result = await self.execute_command(f"unload {plugin_name}", timeout) if result.status == OperationStatus.SUCCESS: return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"unloaded": True, "plugin": plugin_name}, execution_time=result.execution_time ) elif action == "info": if not plugin_name: # Show available plugins return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={ "available_plugins": [ "aggregator", "alias", "auto_add_route", "beholder", "db_credcollect", "db_tracker", "event_tester", "ffautoregen", "ips_filter", "lab", "libnotify", "msfd", "msgrpc", "nessus", "nexpose", "openvas", "pcap_log", "request", "rssfeed", "sample", "session_notifier", "session_tagger", "socket_logger", "sounds", "sqlmap", "thread", "token_adduser", "token_hunter", "wiki", "wmap" ] }, execution_time=time.time() - start_time ) else: # Get plugin info plugin_info = self._get_plugin_info(plugin_name) return ExtendedOperationResult( status=OperationStatus.SUCCESS, data={"plugin": plugin_name, "info": plugin_info}, execution_time=time.time() - start_time ) # Invalid action return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=f"Invalid action: {action}. Valid actions: load, unload, list, info" ) except Exception as e: logger.error(f"Plugin manager error: {e}") return ExtendedOperationResult( status=OperationStatus.FAILURE, data=None, execution_time=time.time() - start_time, error=str(e) ) # ==================== Helper Methods ==================== def _parse_module_info(self, output: str) -> Dict[str, Any]: """Parse module info output""" info = { "name": "", "description": "", "author": [], "references": [], "platform": "", "targets": [] } lines = output.split('\n') current_section = None for line in lines: line = line.strip() if "Name:" in line: info["name"] = line.split("Name:", 1)[1].strip() elif "Description:" in line: current_section = "description" elif "Author:" in line: current_section = "author" elif "References:" in line: current_section = "references" elif "Platform:" in line: info["platform"] = line.split("Platform:", 1)[1].strip() elif "Available targets:" in line: current_section = "targets" elif current_section and line: if current_section == "description": info["description"] += line + " " elif current_section == "author" and line.startswith("-"): info["author"].append(line[1:].strip()) elif current_section == "references" and line.startswith("-"): info["references"].append(line[1:].strip()) elif current_section == "targets" and line.startswith("Id"): continue # Skip header elif current_section == "targets" and line[0].isdigit(): info["targets"].append(line) return info def _parse_options(self, output: str) -> Dict[str, Any]: """Parse module options output""" options = { "module_options": [], "payload_options": [], "advanced_options": [] } lines = output.split('\n') current_section = None for line in lines: line = line.strip() if "Module options" in line: current_section = "module_options" elif "Payload options" in line: current_section = "payload_options" elif "Advanced options" in line: current_section = "advanced_options" elif current_section and line and not line.startswith("=") and not line.startswith("-"): # Parse option line parts = line.split() if len(parts) >= 4: option = { "name": parts[0], "current_setting": parts[1] if parts[1] != "no" else "", "required": parts[2] == "yes", "description": " ".join(parts[3:]) } options[current_section].append(option) return options def _extract_session_info(self, output: str) -> Optional[Dict[str, Any]]: """Extract session information from exploit output""" session_info = None # Look for session creation messages if "session" in output.lower(): lines = output.split('\n') for line in lines: if "session" in line.lower() and "opened" in line.lower(): # Extract session ID match = re.search(r'session (\d+) opened', line, re.IGNORECASE) if match: session_info = { "id": int(match.group(1)), "type": "meterpreter" if "meterpreter" in line.lower() else "shell" } break return session_info def _parse_check_result(self, output: str) -> Dict[str, Any]: """Parse check command result""" result = { "vulnerable": False, "status": "unknown", "details": "" } output_lower = output.lower() if "vulnerable" in output_lower: result["vulnerable"] = True result["status"] = "vulnerable" elif "not vulnerable" in output_lower: result["vulnerable"] = False result["status"] = "safe" elif "detected" in output_lower: result["status"] = "detected" elif "unknown" in output_lower: result["status"] = "unknown" result["details"] = output.strip() return result def _parse_sessions(self, output: str) -> List[Dict[str, Any]]: """Parse sessions list output""" sessions = [] lines = output.split('\n') for line in lines: line = line.strip() # Skip headers and empty lines if not line or line.startswith("Active sessions") or line.startswith("=") or line.startswith("Id"): continue # Parse session line parts = line.split() if len(parts) >= 5 and parts[0].isdigit(): session = { "id": int(parts[0]), "type": parts[1], "info": parts[2], "connection": " ".join(parts[3:]) } sessions.append(session) return sessions def _build_filter_args(self, filters: Dict[str, Any]) -> str: """Build filter arguments for database commands""" args = [] if "address" in filters: args.append(f"-a {filters['address']}") if "port" in filters: args.append(f"-p {filters['port']}") if "service" in filters: args.append(f"-s {filters['service']}") if "host" in filters: args.append(f"-h {filters['host']}") return " ".join(args) def _build_add_command(self, table: str, data: Dict[str, Any]) -> str: """Build add command for database tables""" if table == "hosts": cmd = f"hosts -a {data.get('address', '')}" if "name" in data: cmd += f" -n {data['name']}" if "os" in data: cmd += f" -o {data['os']}" return cmd elif table == "services": return f"services -a -p {data.get('port', '')} -r {data.get('proto', 'tcp')} {data.get('host', '')}" elif table == "creds": return f"creds add user:{data.get('user', '')} password:{data.get('password', '')} host:{data.get('host', '')}" else: return "" def _parse_database_output(self, table: str, output: str) -> List[Dict[str, Any]]: """Parse database table output""" items = [] lines = output.split('\n') headers = [] for line in lines: line = line.strip() # Skip empty lines and separators if not line or line.startswith("="): continue # Parse headers if line.startswith("address") or line.startswith("host") or line.startswith("port"): headers = line.split() continue # Parse data lines if headers and line: parts = line.split() if len(parts) >= len(headers): item = {} for i, header in enumerate(headers): if i < len(parts): item[header] = parts[i] items.append(item) return items def _parse_post_module_output(self, module: str, output: str) -> Dict[str, Any]: """Parse post-exploitation module output""" results = { "success": True, "data": [], "raw_output": output } # Parse based on module type if "enum_files" in module: # Parse file enumeration lines = output.split('\n') for line in lines: if line.strip() and "/" in line: results["data"].append({"file": line.strip()}) elif "screen_spy" in module: # Parse screenshot info if "Screenshot saved" in output: results["data"].append({"screenshot": "saved"}) elif "credential" in module.lower(): # Parse credentials if "Username" in output and "Password" in output: results["data"].append({"credentials": "found"}) return results def _extract_job_id(self, output: str) -> Optional[int]: """Extract job ID from handler output""" match = re.search(r'Job (\d+)', output) if match: return int(match.group(1)) return None def _parse_jobs(self, output: str) -> List[Dict[str, Any]]: """Parse jobs list output""" jobs = [] lines = output.split('\n') for line in lines: line = line.strip() # Skip headers if not line or line.startswith("Jobs") or line.startswith("=") or line.startswith("Id"): continue # Parse job line parts = line.split() if len(parts) >= 2 and parts[0].isdigit(): job = { "id": int(parts[0]), "name": " ".join(parts[1:]) } jobs.append(job) return jobs def _parse_scan_output(self, scan_type: str, output: str) -> List[Dict[str, Any]]: """Parse scanner output based on scan type""" results = [] lines = output.split('\n') for line in lines: line = line.strip() if not line: continue # Parse based on scan type if scan_type == "port": if "open" in line.lower(): parts = line.split() if len(parts) >= 3: results.append({ "host": parts[0], "port": parts[1], "state": "open" }) elif scan_type in ["smb", "http", "ssh", "ftp"]: if "detected" in line.lower() or "version" in line.lower(): results.append({"service": scan_type, "info": line}) elif scan_type == "discovery": if "host" in line.lower() and "up" in line.lower(): parts = line.split() if parts: results.append({"host": parts[0], "status": "up"}) return results def _parse_credentials(self, output: str) -> List[Dict[str, Any]]: """Parse credentials output""" creds = [] lines = output.split('\n') for line in lines: line = line.strip() # Skip headers and empty lines if not line or line.startswith("Credentials") or line.startswith("=") or line.startswith("host"): continue # Parse credential line parts = line.split() if len(parts) >= 4: cred = { "host": parts[0], "service": parts[1], "username": parts[2], "password": parts[3] if len(parts) > 3 else "", "type": parts[4] if len(parts) > 4 else "password" } creds.append(cred) return creds def _parse_routes(self, output: str) -> List[Dict[str, Any]]: """Parse routes output""" routes = [] lines = output.split('\n') for line in lines: line = line.strip() # Skip headers if not line or line.startswith("IPv4 Active Routing") or line.startswith("=") or line.startswith("Subnet"): continue # Parse route line parts = line.split() if len(parts) >= 3: route = { "subnet": parts[0], "netmask": parts[1], "gateway": parts[2] } routes.append(route) return routes def _parse_loot(self, output: str) -> List[Dict[str, Any]]: """Parse loot output""" loot_items = [] lines = output.split('\n') for line in lines: line = line.strip() # Skip headers if not line or line.startswith("Loot") or line.startswith("=") or line.startswith("host"): continue # Parse loot line parts = line.split() if len(parts) >= 4: loot = { "host": parts[0], "service": parts[1], "type": parts[2], "path": parts[3] if len(parts) > 3 else "" } loot_items.append(loot) return loot_items def _parse_vulnerabilities(self, output: str) -> List[Dict[str, Any]]: """Parse vulnerabilities output""" vulns = [] lines = output.split('\n') for line in lines: line = line.strip() # Skip headers if not line or line.startswith("Vulnerabilities") or line.startswith("=") or line.startswith("Timestamp"): continue # Parse vulnerability line parts = line.split() if len(parts) >= 4: vuln = { "host": parts[1] if len(parts) > 1 else "", "name": parts[2] if len(parts) > 2 else "", "references": parts[3] if len(parts) > 3 else "" } vulns.append(vuln) return vulns def _count_by_severity(self, vulns: List[Dict[str, Any]]) -> Dict[str, int]: """Count vulnerabilities by severity""" severity_count = { "critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0 } for vuln in vulns: # Simple severity classification based on name name = vuln.get("name", "").lower() if "critical" in name or "rce" in name: severity_count["critical"] += 1 elif "high" in name or "exploit" in name: severity_count["high"] += 1 elif "medium" in name: severity_count["medium"] += 1 elif "low" in name: severity_count["low"] += 1 else: severity_count["info"] += 1 return severity_count def _count_by_service(self, vulns: List[Dict[str, Any]]) -> Dict[str, int]: """Count vulnerabilities by service""" service_count = {} for vuln in vulns: # Extract service from vulnerability name name = vuln.get("name", "") for service in ["smb", "http", "ssh", "ftp", "mysql", "postgres", "rdp"]: if service in name.lower(): service_count[service] = service_count.get(service, 0) + 1 break return service_count def _build_exploitation_timeline(self, sessions_data: Dict, loot_data: Dict) -> List[Dict[str, Any]]: """Build exploitation timeline""" timeline = [] # Add session events if sessions_data and "sessions" in sessions_data: for session in sessions_data["sessions"]: timeline.append({ "timestamp": "N/A", # Would need actual timestamps "event": "session_opened", "details": f"Session {session['id']} opened ({session['type']})" }) # Add loot events if loot_data and "loot" in loot_data: for loot in loot_data["loot"]: timeline.append({ "timestamp": "N/A", "event": "loot_collected", "details": f"Collected {loot['type']} from {loot['host']}" }) return timeline def _calculate_risk_score(self, vulns_data: Dict) -> Dict[str, Any]: """Calculate overall risk score""" if not vulns_data or "vulnerabilities" not in vulns_data: return {"score": 0, "level": "low"} vulns = vulns_data["vulnerabilities"] severity_count = self._count_by_severity(vulns) # Simple risk calculation score = ( severity_count["critical"] * 10 + severity_count["high"] * 5 + severity_count["medium"] * 3 + severity_count["low"] * 1 ) # Determine risk level if score >= 50: level = "critical" elif score >= 30: level = "high" elif score >= 15: level = "medium" else: level = "low" return { "score": score, "level": level, "breakdown": severity_count } def _generate_recommendations(self, vulns_data: Dict) -> List[str]: """Generate security recommendations""" recommendations = [] if not vulns_data or "vulnerabilities" not in vulns_data: return ["Perform comprehensive vulnerability assessment"] vulns = vulns_data["vulnerabilities"] severity_count = self._count_by_severity(vulns) if severity_count["critical"] > 0: recommendations.append(f"URGENT: Patch {severity_count['critical']} critical vulnerabilities immediately") if severity_count["high"] > 0: recommendations.append(f"HIGH PRIORITY: Address {severity_count['high']} high-severity vulnerabilities") # Service-specific recommendations service_count = self._count_by_service(vulns) for service, count in service_count.items(): if count > 0: recommendations.append(f"Review and harden {service.upper()} service configuration ({count} vulnerabilities)") recommendations.append("Implement regular vulnerability scanning schedule") recommendations.append("Establish patch management process") return recommendations def _format_report(self, report_data: Dict, format: str) -> Union[str, Dict]: """Format report based on requested format""" if format == "json": return report_data elif format == "text": # Convert to text format text = f"Report Type: {report_data['type']}\n" text += f"Generated: {report_data['timestamp']}\n" text += "=" * 50 + "\n" text += json.dumps(report_data['data'], indent=2) return text elif format == "xml": # Simple XML conversion xml = '<?xml version="1.0" encoding="UTF-8"?>\n' xml += f'<report type="{report_data["type"]}" timestamp="{report_data["timestamp"]}">\n' xml += ' <data>\n' # Would need proper XML serialization xml += f' {json.dumps(report_data["data"])}\n' xml += ' </data>\n' xml += '</report>' return xml else: # Default to JSON return report_data def _parse_plugins(self, output: str) -> List[Dict[str, Any]]: """Parse loaded plugins output""" plugins = [] lines = output.split('\n') for line in lines: line = line.strip() # Skip headers and empty lines if not line or line.startswith("Loaded plugins") or line.startswith("="): continue # Parse plugin line if line: plugins.append({ "name": line, "loaded": True }) return plugins def _get_plugin_info(self, plugin_name: str) -> Dict[str, str]: """Get plugin information""" # Plugin descriptions plugin_info = { "nessus": "Nessus vulnerability scanner integration", "nexpose": "Rapid7 Nexpose vulnerability management", "openvas": "OpenVAS vulnerability scanner integration", "nmap": "Nmap network scanner integration", "sqlmap": "SQL injection testing integration", "wmap": "Web application scanner", "db_tracker": "Track database changes", "session_notifier": "Notify on new sessions", "sounds": "Audio notifications for events", "msgrpc": "MessagePack RPC service" } return { "description": plugin_info.get(plugin_name, "Plugin for enhanced functionality"), "status": "available" } # Testing the implementation if __name__ == "__main__": async def test_extended_tools(): """Test the extended tools implementation""" tools = MSFExtendedTools() # Initialize print("Initializing MSF Extended Tools...") init_result = await tools.initialize() print(f"Initialization: {init_result.status.value}") if init_result.status == OperationStatus.SUCCESS: # Test module manager print("\n1. Testing Module Manager...") module_result = await tools.msf_module_manager( action="info", module_path="exploit/windows/smb/ms17_010_eternalblue" ) print(f"Module info: {module_result.status.value}") # Test session interact print("\n2. Testing Session Interact...") session_result = await tools.msf_session_interact(action="list") print(f"Sessions: {session_result.data}") # Clean up await tools.cleanup() print("\nTests completed!") # Run tests # asyncio.run(test_extended_tools())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PreistlyPython/msfconsole-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server