powershell_tools.pyā¢14.7 kB
"""
PowerShell and CMD execution tools for Windows Operations MCP.
BULLETPROOF IMPLEMENTATION - Fixes Windows encoding and buffering issues.
Based on comprehensive research into Windows PowerShell subprocess issues.
Solves the "empty stdout" problem that affects millions of Windows developers.
"""
from typing import Dict, Any, Optional
import subprocess
import time
import os
import logging
import ctypes
logger = logging.getLogger(__name__)
class PowerShellExecutor:
"""
Bulletproof PowerShell execution that handles all Windows encoding issues.
Fixes the systematic problems:
1. Windows encoding hell (console cp850 vs ANSI cp1252)
2. PowerShell block buffering (4KB chunks)
3. Execution policies and profile interference
"""
def __init__(self):
self.console_encoding = self._get_real_console_encoding()
logger.info(f"PowerShell executor initialized with encoding: {self.console_encoding}")
def _get_real_console_encoding(self):
"""Get ACTUAL Windows console encoding, not Python's guess."""
try:
# Method 1: Use Windows API to get real console codepage
console_cp = ctypes.windll.kernel32.GetConsoleOutputCP()
return f'cp{console_cp}'
except Exception as e:
logger.warning(f"Could not get console encoding via Windows API: {e}")
try:
# Method 2: Use os.device_encoding (more reliable than locale)
device_encoding = os.device_encoding(0)
if device_encoding:
return device_encoding
except Exception as e:
logger.warning(f"Could not get device encoding: {e}")
# Method 3: Fallback to UTF-8
return 'utf-8'
def execute(self, command: str, working_dir: Optional[str] = None, timeout: int = 60) -> Dict[str, Any]:
"""
Execute PowerShell command with bulletproof error handling.
Args:
command: PowerShell command to execute
working_dir: Working directory (optional)
timeout: Command timeout in seconds
Returns:
dict: {
'success': bool,
'stdout': str,
'stderr': str,
'exit_code': int,
'execution_time': float,
'encoding_used': str
}
"""
start_time = time.time()
try:
# PowerShell setup commands to force UTF-8 and handle context
setup_commands = [
# Force console and output encoding to UTF-8
'[Console]::OutputEncoding = [System.Text.Encoding]::UTF8',
'$OutputEncoding = [System.Text.Encoding]::UTF8',
# Set error action preference to stop on errors
'$ErrorActionPreference = "Stop"',
]
# Add working directory change if specified
if working_dir:
setup_commands.append(f'Set-Location "{working_dir}"')
# Combine setup with actual command, force string output to bypass buffering
full_command = '; '.join(setup_commands) + f'; {command} | Out-String -Width 4096'
# Build bulletproof PowerShell command
cmd = [
'powershell.exe',
'-NoProfile', # Skip user profile loading
'-NonInteractive', # No interactive prompts
'-ExecutionPolicy', 'Bypass', # Override execution policy
'-OutputFormat', 'Text', # Force text output format
'-Command',
full_command
]
logger.debug(f"Executing PowerShell: {' '.join(cmd[:6])}... (command truncated)")
# Execute with proper encoding handling
result = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding='utf-8', # PowerShell forced to UTF-8 above
errors='replace', # Never crash on encoding errors
timeout=timeout,
cwd=working_dir # Also set at process level for safety
)
execution_time = time.time() - start_time
response = {
'success': result.returncode == 0,
'stdout': result.stdout.strip(),
'stderr': result.stderr.strip(),
'exit_code': result.returncode,
'execution_time': execution_time,
'encoding_used': 'utf-8'
}
if response['success']:
logger.debug(f"PowerShell command succeeded in {execution_time:.2f}s")
else:
logger.warning(f"PowerShell command failed with exit code {result.returncode}")
return response
except subprocess.TimeoutExpired:
execution_time = time.time() - start_time
logger.error(f"PowerShell command timed out after {timeout} seconds")
return {
'success': False,
'stdout': '',
'stderr': f'Command timed out after {timeout} seconds',
'exit_code': -1,
'execution_time': execution_time,
'encoding_used': 'timeout'
}
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"PowerShell execution error: {e}")
return {
'success': False,
'stdout': '',
'stderr': f'Execution error: {str(e)}',
'exit_code': -1,
'execution_time': execution_time,
'encoding_used': 'error'
}
# Global executor instance
_powershell_executor = PowerShellExecutor()
def register_powershell_tools(mcp):
"""Register PowerShell and CMD tools with FastMCP."""
@mcp.tool()
def run_powershell_tool(
command: str,
working_directory: Optional[str] = None,
timeout_seconds: int = 60,
capture_output: bool = True,
max_output_size: int = 102400,
output_encoding: str = "utf-8",
as_admin: bool = False
) -> Dict[str, Any]:
"""
Execute PowerShell commands with reliable output capture and security checks.
BULLETPROOF IMPLEMENTATION that fixes Windows PowerShell subprocess issues:
- Solves encoding hell (console cp850 vs ANSI cp1252)
- Bypasses PowerShell block buffering
- Handles execution policies properly
- Forces UTF-8 encoding consistently
Args:
command: PowerShell command to execute
working_directory: Directory to run command in
timeout_seconds: Command timeout (1-300 seconds)
capture_output: Whether to capture stdout/stderr
max_output_size: Maximum output size in bytes
output_encoding: Output encoding (always utf-8 for reliability)
as_admin: Whether to run as administrator (not implemented for security)
Returns:
Dict with success, stdout, stderr, exit_code, execution_time
"""
# Input validation
if not command or not command.strip():
return {
"success": False,
"stdout": "",
"stderr": "Error: Command cannot be empty",
"exit_code": -1,
"execution_time": 0.0
}
if timeout_seconds < 1 or timeout_seconds > 300:
return {
"success": False,
"stdout": "",
"stderr": "Error: Timeout must be between 1 and 300 seconds",
"exit_code": -1,
"execution_time": 0.0
}
if as_admin:
return {
"success": False,
"stdout": "",
"stderr": "Error: Administrator execution not supported for security reasons",
"exit_code": -1,
"execution_time": 0.0
}
# Security check - block potentially dangerous commands
dangerous_patterns = [
'format', 'del /s', 'rm -rf', 'rmdir /s', 'remove-item -recurse',
'invoke-expression', 'iex', 'invoke-webrequest', 'iwr',
'start-process', 'new-object', 'add-type'
]
command_lower = command.lower()
for pattern in dangerous_patterns:
if pattern in command_lower:
logger.warning(f"Blocked potentially dangerous command: {pattern}")
return {
"success": False,
"stdout": "",
"stderr": f"Error: Command blocked for security (contains '{pattern}')",
"exit_code": -1,
"execution_time": 0.0
}
# Validate working directory
if working_directory:
if not os.path.exists(working_directory):
return {
"success": False,
"stdout": "",
"stderr": f"Error: Working directory does not exist: {working_directory}",
"exit_code": -1,
"execution_time": 0.0
}
if not os.path.isdir(working_directory):
return {
"success": False,
"stdout": "",
"stderr": f"Error: Working directory is not a directory: {working_directory}",
"exit_code": -1,
"execution_time": 0.0
}
# Execute using bulletproof executor
result = _powershell_executor.execute(
command=command,
working_dir=working_directory,
timeout=timeout_seconds
)
# Truncate output if too large
if len(result['stdout']) > max_output_size:
result['stdout'] = result['stdout'][:max_output_size] + '\n[OUTPUT TRUNCATED]'
if len(result['stderr']) > max_output_size:
result['stderr'] = result['stderr'][:max_output_size] + '\n[ERROR TRUNCATED]'
# Log the result for debugging
if result['success']:
logger.info(f"PowerShell command executed successfully in {result['execution_time']:.2f}s")
else:
logger.warning(f"PowerShell command failed: {result['stderr']}")
return {
"success": result['success'],
"stdout": result['stdout'],
"stderr": result['stderr'],
"exit_code": result['exit_code'],
"execution_time": result['execution_time'],
"command": command,
"working_directory": working_directory or os.getcwd()
}
@mcp.tool()
def run_cmd_tool(
command: str,
working_directory: Optional[str] = None,
timeout_seconds: int = 60,
capture_output: bool = True,
max_output_size: int = 102400,
output_encoding: str = "utf-8"
) -> Dict[str, Any]:
"""
Execute CMD commands with reliable output capture and security checks.
Uses proper Windows console encoding and timeout handling.
"""
start_time = time.time()
try:
# Input validation
if not command or not command.strip():
return {
"success": False,
"stdout": "",
"stderr": "Error: Command cannot be empty",
"exit_code": -1,
"execution_time": 0.0
}
if timeout_seconds < 1 or timeout_seconds > 300:
return {
"success": False,
"stdout": "",
"stderr": "Error: Timeout must be between 1 and 300 seconds",
"exit_code": -1,
"execution_time": 0.0
}
# Get real console encoding for CMD
try:
console_cp = ctypes.windll.kernel32.GetConsoleOutputCP()
encoding = f'cp{console_cp}'
except:
encoding = 'cp1252' # Windows CMD fallback
# Execute command
result = subprocess.run(
command,
shell=True,
cwd=working_directory,
timeout=timeout_seconds,
capture_output=True,
text=True,
encoding=encoding,
errors='replace' # Never crash on encoding errors
)
execution_time = time.time() - start_time
# Truncate output if too large
stdout = result.stdout
stderr = result.stderr
if len(stdout) > max_output_size:
stdout = stdout[:max_output_size] + '\n[OUTPUT TRUNCATED]'
if len(stderr) > max_output_size:
stderr = stderr[:max_output_size] + '\n[ERROR TRUNCATED]'
return {
"success": result.returncode == 0,
"stdout": stdout,
"stderr": stderr,
"exit_code": result.returncode,
"execution_time": execution_time,
"command": command,
"working_directory": working_directory or os.getcwd()
}
except subprocess.TimeoutExpired:
execution_time = time.time() - start_time
return {
"success": False,
"stdout": "",
"stderr": f"Command timed out after {timeout_seconds} seconds",
"exit_code": -1,
"execution_time": execution_time
}
except Exception as e:
execution_time = time.time() - start_time
return {
"success": False,
"stdout": "",
"stderr": f"Execution failed: {str(e)}",
"exit_code": -1,
"execution_time": execution_time
}
logger.info("Bulletproof PowerShell tools registered successfully")