Skip to main content
Glama

Windows Operations MCP

powershell_tools.py•23.5 kB
""" PowerShell and CMD execution tools for Windows Operations MCP. QUICK FIX VERSION - Addresses core stdout issues and over-restrictive security. FIXES APPLIED: 1. Relaxed security filters (removed 'format' block, allow common cmdlets) 2. Fixed encoding handling (use native console encoding) 3. Removed forced Out-String wrapping 4. Simplified command execution """ from typing import Dict, Any, Optional import subprocess import time import os import logging import ctypes from ..logging_config import get_logger from ..decorators import tool logger = get_logger(__name__) class CMDExecutor: """ CMD execution with reliable output capture. """ def __init__(self): self.encoding = self._get_console_encoding() logger.info(f"CMD executor initialized with encoding: {self.encoding}") def _get_console_encoding(self) -> str: """Get the native console encoding.""" try: # Try to get the console code page kernel32 = ctypes.windll.kernel32 cp = kernel32.GetConsoleCP() if cp: return f"cp{cp}" except: pass return "cp850" # Default Windows console encoding def execute(self, command: str, working_directory: Optional[str] = None, timeout: int = 30) -> Dict[str, Any]: """Execute CMD command with reliable output capture.""" start_time = time.time() try: # Use cmd.exe /c to execute the command cmd_args = ["cmd.exe", "/c", command] # Set working directory cwd = working_directory or os.getcwd() # Execute command result = subprocess.run( cmd_args, cwd=cwd, capture_output=True, text=True, encoding=self.encoding, timeout=timeout, errors='replace' ) execution_time = time.time() - start_time return { "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, "exit_code": result.returncode, "execution_time": execution_time } except subprocess.TimeoutExpired: execution_time = time.time() - start_time return { "success": False, "stdout": "", "stderr": f"Command timed out after {timeout} seconds", "exit_code": -1, "execution_time": execution_time } except Exception as e: execution_time = time.time() - start_time return { "success": False, "stdout": "", "stderr": f"Execution error: {str(e)}", "exit_code": -1, "execution_time": execution_time } class PowerShellExecutor: """ Fixed PowerShell execution that solves the "no stdout" problem. Key fixes: 1. Relaxed security filters - allow common PowerShell cmdlets 2. Native encoding detection instead of forcing UTF-8 3. Clean command execution without unnecessary wrapping """ def __init__(self): self.console_encoding = self._get_console_encoding() logger.info(f"PowerShell executor initialized with encoding: {self.console_encoding}") def _get_console_encoding(self): """Get Windows console encoding reliably.""" try: # Get the actual console codepage console_cp = ctypes.windll.kernel32.GetConsoleOutputCP() if console_cp == 65001: # UTF-8 return 'utf-8' elif console_cp == 1252: # Windows-1252 return 'cp1252' else: return f'cp{console_cp}' except Exception as e: logger.warning(f"Could not get console encoding: {e}") return 'utf-8' # Safe fallback def execute(self, command: str, working_dir: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]: """ Execute PowerShell command with reliable output capture. Args: command: PowerShell command to execute working_dir: Working directory (optional) timeout: Command timeout in seconds Returns: dict: { 'success': bool, 'stdout': str, 'stderr': str, 'exit_code': int, 'execution_time': float, 'encoding_used': str } """ start_time = time.time() try: # Build clean PowerShell command - NO forced encoding, NO Out-String wrapping cmd = [ 'powershell.exe', '-NoProfile', # Skip user profile loading '-NonInteractive', # No interactive prompts '-ExecutionPolicy', 'Bypass', # Override execution policy '-Command', command # Execute command as-is, no manipulation ] logger.debug(f"Executing PowerShell: {command[:100]}...") # Execute with native encoding result = subprocess.run( cmd, capture_output=True, text=True, encoding=self.console_encoding, # Use detected console encoding errors='replace', # Never crash on encoding errors timeout=timeout, cwd=working_dir ) execution_time = time.time() - start_time response = { 'success': result.returncode == 0, 'stdout': result.stdout, # No .strip() - preserve formatting 'stderr': result.stderr, 'exit_code': result.returncode, 'execution_time': execution_time, 'encoding_used': self.console_encoding } if response['success']: logger.debug(f"PowerShell command succeeded in {execution_time:.2f}s") else: logger.warning(f"PowerShell command failed with exit code {result.returncode}") return response except subprocess.TimeoutExpired: execution_time = time.time() - start_time logger.error(f"PowerShell command timed out after {timeout} seconds") return { 'success': False, 'stdout': '', 'stderr': f'Command timed out after {timeout} seconds', 'exit_code': -1, 'execution_time': execution_time, 'encoding_used': 'timeout' } except Exception as e: execution_time = time.time() - start_time logger.error(f"PowerShell execution error: {e}") return { 'success': False, 'stdout': '', 'stderr': f'Execution error: {str(e)}', 'exit_code': -1, 'execution_time': execution_time, 'encoding_used': 'error' } # Global executor instance _powershell_executor = PowerShellExecutor() def register_powershell_tools(mcp): """Register FIXED PowerShell and CMD tools with FastMCP.""" # Register the PowerShell and CMD tools with MCP mcp.tool(run_powershell_tool) mcp.tool(run_cmd_tool) @tool( name="run_powershell_tool", description="Execute PowerShell commands with reliable output capture and security checks", parameters={ "command": { "type": "string", "description": "PowerShell command to execute" }, "working_directory": { "type": "string", "description": "Working directory for command execution" }, "timeout_seconds": { "type": "integer", "description": "Command timeout in seconds", "default": 60 }, "capture_output": { "type": "boolean", "description": "Whether to capture command output", "default": True }, "max_output_size": { "type": "integer", "description": "Maximum output size to capture", "default": 102400 }, "output_encoding": { "type": "string", "description": "Output encoding", "default": "utf-8" }, "as_admin": { "type": "boolean", "description": "Run as administrator", "default": False } }, required=["command"], returns={ "type": "object", "properties": { "success": {"type": "boolean"}, "output": {"type": "string"}, "error": {"type": "string"}, "return_code": {"type": "integer"}, "execution_time": {"type": "number"} } } ) def run_powershell_tool( command: str, working_directory: Optional[str] = None, timeout_seconds: int = 60, capture_output: bool = True, max_output_size: int = 102400, output_encoding: str = "utf-8", as_admin: bool = False ) -> Dict[str, Any]: """ Execute PowerShell commands with reliable output capture and security checks. FIXED VERSION that solves Windows PowerShell subprocess issues: - Removed overly aggressive security filters - Uses native console encoding instead of forcing UTF-8 - No command wrapping that breaks output - Allows common cmdlets like Format-Table Args: command: PowerShell command to execute working_directory: Directory to run command in timeout_seconds: Command timeout (1-300 seconds) capture_output: Whether to capture stdout/stderr max_output_size: Maximum output size in bytes output_encoding: Output encoding (ignored - uses native) as_admin: Whether to run as administrator (not implemented for security) Returns: Dict with success, stdout, stderr, exit_code, execution_time """ # Input validation if not command or not command.strip(): return { "success": False, "stdout": "", "stderr": "Error: Command cannot be empty", "exit_code": -1, "execution_time": 0.0 } if timeout_seconds < 1 or timeout_seconds > 300: return { "success": False, "stdout": "", "stderr": "Error: Timeout must be between 1 and 300 seconds", "exit_code": -1, "execution_time": 0.0 } if as_admin: return { "success": False, "stdout": "", "stderr": "Error: Administrator execution not supported for security reasons", "exit_code": -1, "execution_time": 0.0 } # RELAXED Security check - only block truly dangerous operations dangerous_patterns = [ # Keep only the really dangerous ones 'invoke-expression', 'iex', # Code injection 'invoke-webrequest', 'iwr', 'curl', # Network operations 'start-process', # Process spawning 'remove-item.*-recurse.*-force', # Recursive forced deletion 'del.*\\s', # CMD recursive delete 'rmdir.*\\s', # CMD recursive directory removal ] command_lower = command.lower() for pattern in dangerous_patterns: if pattern.replace('.*', ' ') in command_lower: # Simple contains check logger.warning(f"Blocked potentially dangerous command: {pattern}") return { "success": False, "stdout": "", "stderr": f"Error: Command blocked for security (contains '{pattern.replace('.*', '')}')", "exit_code": -1, "execution_time": 0.0 } # Validate working directory if working_directory: if not os.path.exists(working_directory): return { "success": False, "stdout": "", "stderr": f"Error: Working directory does not exist: {working_directory}", "exit_code": -1, "execution_time": 0.0 } if not os.path.isdir(working_directory): return { "success": False, "stdout": "", "stderr": f"Error: Working directory is not a directory: {working_directory}", "exit_code": -1, "execution_time": 0.0 } # Execute using fixed executor result = _powershell_executor.execute( command=command, working_dir=working_directory, timeout=timeout_seconds ) # Truncate output if too large (but preserve structure) if len(result['stdout']) > max_output_size: result['stdout'] = result['stdout'][:max_output_size] + '\n[OUTPUT TRUNCATED]' if len(result['stderr']) > max_output_size: result['stderr'] = result['stderr'][:max_output_size] + '\n[ERROR TRUNCATED]' # Log the result for debugging if result['success']: logger.info(f"PowerShell command executed successfully in {result['execution_time']:.2f}s") else: logger.warning(f"PowerShell command failed: {result['stderr']}") return { "success": result['success'], "stdout": result['stdout'], "stderr": result['stderr'], "exit_code": result['exit_code'], "execution_time": result['execution_time'], "command": command, "working_directory": working_directory or os.getcwd() } @tool( name="run_cmd_tool", description="Execute CMD commands with reliable output capture and security checks", parameters={ "command": { "type": "string", "description": "CMD command to execute" }, "working_directory": { "type": "string", "description": "Working directory for command execution" }, "timeout_seconds": { "type": "integer", "description": "Command timeout in seconds", "default": 60 }, "capture_output": { "type": "boolean", "description": "Whether to capture command output", "default": True }, "max_output_size": { "type": "integer", "description": "Maximum output size to capture", "default": 102400 }, "output_encoding": { "type": "string", "description": "Output encoding", "default": "cp1252" } }, required=["command"], returns={ "type": "object", "properties": { "success": {"type": "boolean"}, "stdout": {"type": "string"}, "stderr": {"type": "string"}, "exit_code": {"type": "integer"}, "execution_time": {"type": "number"}, "command": {"type": "string"}, "working_directory": {"type": "string"} } } ) def run_cmd_tool( command: str, working_directory: Optional[str] = None, timeout_seconds: int = 60, capture_output: bool = True, max_output_size: int = 102400, output_encoding: str = "cp1252" ) -> Dict[str, Any]: """ Execute CMD commands with reliable output capture and security checks. FIXED VERSION - Uses proper Windows console encoding and simplified execution. """ start_time = time.time() try: # Input validation if not command or not command.strip(): return { "success": False, "stdout": "", "stderr": "Error: Command cannot be empty", "exit_code": -1, "execution_time": 0.0 } if timeout_seconds < 1 or timeout_seconds > 300: return { "success": False, "stdout": "", "stderr": "Error: Timeout must be between 1 and 300 seconds", "exit_code": -1, "execution_time": 0.0 } # Get console encoding for CMD try: console_cp = ctypes.windll.kernel32.GetConsoleOutputCP() encoding = f'cp{console_cp}' if console_cp != 65001 else 'utf-8' except: encoding = 'cp1252' # Windows CMD fallback # Execute command directly result = subprocess.run( command, shell=True, cwd=working_directory, timeout=timeout_seconds, capture_output=True, text=True, encoding=encoding, errors='replace' # Never crash on encoding errors ) execution_time = time.time() - start_time # Truncate output if too large stdout = result.stdout stderr = result.stderr if len(stdout) > max_output_size: stdout = stdout[:max_output_size] + '\n[OUTPUT TRUNCATED]' if len(stderr) > max_output_size: stderr = stderr[:max_output_size] + '\n[ERROR TRUNCATED]' return { "success": result.returncode == 0, "stdout": stdout, "stderr": stderr, "exit_code": result.returncode, "execution_time": execution_time, "command": command, "working_directory": working_directory or os.getcwd() } except subprocess.TimeoutExpired: execution_time = time.time() - start_time return { "success": False, "stdout": "", "stderr": f"Command timed out after {timeout_seconds} seconds", "exit_code": -1, "execution_time": execution_time } except Exception as e: execution_time = time.time() - start_time return { "success": False, "stdout": "", "stderr": f"Execution failed: {str(e)}", "exit_code": execution_time } def register_powershell_tools(mcp): """Register PowerShell and CMD execution tools with FastMCP.""" # Create executor instances ps_executor = PowerShellExecutor() cmd_executor = CMDExecutor() # Register PowerShell tool @mcp.tool( name="run_powershell_tool", description="Execute PowerShell commands with reliable output capture and security checks", parameters={ "command": { "type": "string", "description": "PowerShell command to execute" }, "working_directory": { "type": "string", "description": "Working directory for command execution" }, "timeout_seconds": { "type": "integer", "description": "Command timeout in seconds (1-300)", "default": 30 }, "max_output_size": { "type": "integer", "description": "Maximum output size in characters", "default": 10000 } }, required=["command"], returns={ "type": "object", "properties": { "success": {"type": "boolean"}, "stdout": {"type": "string"}, "stderr": {"type": "string"}, "exit_code": {"type": "integer"}, "execution_time": {"type": "number"} } } ) def run_powershell_tool( command: str, working_directory: Optional[str] = None, timeout_seconds: int = 30, max_output_size: int = 10000 ) -> Dict[str, Any]: """Execute PowerShell command with reliable output capture.""" return ps_executor.execute_command( command=command, working_directory=working_directory, timeout_seconds=timeout_seconds, max_output_size=max_output_size ) # Register CMD tool @mcp.tool( name="run_cmd_tool", description="Execute CMD commands with reliable output capture and security checks", parameters={ "command": { "type": "string", "description": "CMD command to execute" }, "working_directory": { "type": "string", "description": "Working directory for command execution" }, "timeout_seconds": { "type": "integer", "description": "Command timeout in seconds (1-300)", "default": 30 }, "max_output_size": { "type": "integer", "description": "Maximum output size in characters", "default": 10000 } }, required=["command"], returns={ "type": "object", "properties": { "success": {"type": "boolean"}, "stdout": {"type": "string"}, "stderr": {"type": "string"}, "exit_code": {"type": "integer"}, "execution_time": {"type": "number"} } } ) def run_cmd_tool( command: str, working_directory: Optional[str] = None, timeout_seconds: int = 30, max_output_size: int = 10000 ) -> Dict[str, Any]: """Execute CMD command with reliable output capture.""" return cmd_executor.execute_command( command=command, working_directory=working_directory, timeout_seconds=timeout_seconds, max_output_size=max_output_size ) logger.info("PowerShell and CMD tools registered successfully")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/windows-operations-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server