powershell_tools.py.backupā¢19.2 kB
"""
PowerShell and CMD execution tools for Windows Operations MCP.
This module provides tools for executing PowerShell and CMD commands with
structured logging, rate limiting, and security features.
Features:
- Secure command execution with input validation
- Rate limiting to prevent abuse
- Structured logging for auditability
- Safe command execution context
- Support for both PowerShell and CMD commands
- Configurable timeouts and output limits
"""
from typing import Dict, Any, Optional, List, Tuple, Union, Pattern
from pathlib import Path
import re
import time
import os
import logging
from functools import wraps
from ..decorators import tool, validate_inputs, rate_limited, log_execution
from ..logging_config import get_logger
# Import CommandExecutor from the correct module
from windows_operations_mcp.utils.command_executor import CommandExecutor
# Get a structured logger
logger = get_logger(__name__)
# Constants for rate limiting and security
POWERSHELL_RATE_LIMIT = 10 # Max calls per minute
CMD_RATE_LIMIT = 15 # Max calls per minute (slightly higher for CMD)
MAX_COMMAND_LENGTH = 8192 # Maximum allowed command length
MAX_OUTPUT_SIZE = 1024 * 1024 # 1MB max output size
# Regular expressions for command validation
DANGEROUS_PATTERNS = [
# Command chaining and separation
(r'[;&]', 'Command chaining'),
(r'\|\s*;', 'Command chaining with pipe'),
(r'`', 'Backtick command substitution'),
# Variable and command substitution
(r'\$\s*\{', 'Variable expansion'),
(r'\$\s*\(', 'Command substitution'),
(r'`?\$\s*\w+\s*=\s*\[', 'Variable assignment with expression'),
# Pipeline and redirection
(r'\|\s*\|', 'Pipeline chaining'),
(r'&\s*&', 'Background process chaining'),
(r'\|\s*&', 'Pipeline to background process'),
(r'\|\s*[<>]', 'Pipeline with redirection'),
(r'[<>]\s*\|', 'Redirection with pipe'),
# Method and property access
(r'\[\w+\]::', 'Static method call'),
(r'\.\s*\w+\s*\(' , 'Method call'),
(r'\.\s*\w+\s*\[', 'Indexer access'),
(r'\w+\s*\[', 'Array/hash access'),
# Dangerous commands and patterns
(r'Invoke-Expression', 'Invoke-Expression command'),
(r'Start-Process\s+-NoNewWindow', 'Start-Process with NoNewWindow'),
(r'Remove-Item\s+.*-Recurse', 'Recursive file removal'),
(r'rm\s+-r', 'Recursive remove (Unix-style)'),
(r'del\s+/s', 'Recursive delete (CMD)'),
(r'\[System\.IO\.File\]', 'Direct file system access'),
(r'\[System\.IO\.Directory\]', 'Direct directory access'),
(r'\[System\.Net\.WebClient\]', 'Web client access'),
(r'\[System\.Net\.WebRequest\]', 'Web request access'),
(r'\[System\.Reflection\..*\]', 'Reflection access'),
(r'\[System\.Management\.Automation\..*\]', 'PowerShell internals access')
]
def validate_command_safety(command: str) -> Tuple[bool, str]:
"""
Validate that a command doesn't contain potentially dangerous patterns.
Args:
command: The command to validate
Returns:
Tuple[bool, str]: (is_safe, reason) - True and empty string if safe,
False and reason if not safe
"""
# Check command length
if not command or not isinstance(command, str):
return False, "Command is empty or not a string"
if len(command) > MAX_COMMAND_LENGTH:
return False, f"Command exceeds maximum length of {MAX_COMMAND_LENGTH} characters"
# Check for dangerous patterns
for pattern, description in DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return False, f"Potentially dangerous pattern detected: {description}"
return True, ""
def validate_working_directory(path: Optional[str]) -> Tuple[bool, str]:
"""
Validate that a working directory exists and is accessible.
Args:
path: Path to validate
Returns:
Tuple[bool, str]: (is_valid, error_message) - True and empty string if valid,
False and error message if invalid
"""
if not path:
return True, ""
try:
# Check if path is absolute
if not os.path.isabs(path):
return False, f"Path must be absolute: {path}"
# Normalize path to prevent directory traversal
normalized_path = os.path.normpath(path)
if '..' in normalized_path.split(os.sep):
return False, f"Path contains parent directory traversal: {path}"
# Check if path exists and is a directory
if not os.path.exists(normalized_path):
return False, f"Directory does not exist: {normalized_path}"
if not os.path.isdir(normalized_path):
return False, f"Path is not a directory: {normalized_path}"
# Check if we have read and execute permissions
if not os.access(normalized_path, os.R_OK | os.X_OK):
return False, f"Insufficient permissions for directory: {normalized_path}"
return True, ""
except (TypeError, ValueError, OSError) as e:
return False, f"Invalid path '{path}': {str(e)}"
def _validate_command_execution(
command: str,
working_directory: Optional[str],
timeout_seconds: int
) -> Tuple[bool, str, Dict[str, Any]]:
"""
Validate common command execution parameters.
Args:
command: Command to execute
working_directory: Working directory for command execution
timeout_seconds: Command timeout in seconds
Returns:
Tuple of (is_valid, error_message, sanitized_params)
"""
# Validate command
if not command or not isinstance(command, str) or not command.strip():
return False, "Command cannot be empty", {}
# Validate command safety
is_safe, safety_reason = validate_command_safety(command)
if not is_safe:
return False, safety_reason, {}
# Validate working directory
if working_directory:
is_valid, dir_error = validate_working_directory(working_directory)
if not is_valid:
return False, dir_error, {}
# Validate timeout
if not isinstance(timeout_seconds, (int, float)) or timeout_seconds <= 0:
return False, "Timeout must be a positive number", {}
# Sanitize parameters
sanitized = {
'command': command,
'working_directory': working_directory or os.getcwd(),
'timeout_seconds': min(int(timeout_seconds), 3600), # Max 1 hour timeout
'command_preview': command[:100] + ('...' if len(command) > 100 else '')
}
return True, "", sanitized
def _log_command_result(
command_type: str,
command_preview: str,
result: Dict[str, Any],
start_time: float
) -> None:
"""
Log command execution result with structured logging.
Args:
command_type: Type of command ('powershell' or 'cmd')
command_preview: Preview of the command (truncated if needed)
result: Command execution result dictionary
start_time: Start time of command execution (from time.time())
"""
log_data = {
"command_type": command_type,
"command_preview": command_preview,
"success": result.get("success", False),
"exit_code": result.get("exit_code", -1),
"execution_time_ms": round((time.time() - start_time) * 1000, 2),
"output_size": len(result.get("stdout", "")) + len(result.get("stderr", ""))
}
if result.get("success", False):
logger.info(
f"{command_type}_command_completed",
**log_data
)
else:
logger.error(
f"{command_type}_command_failed",
error=result.get("error", "Unknown error"),
**log_data
)
def register_powershell_tools(mcp):
"""
Register PowerShell and CMD tools with FastMCP.
Args:
mcp: The FastMCP instance to register tools with
"""
@tool(
name="run_powershell",
description="Execute PowerShell commands with reliable output capture and security checks.",
parameters={
"command": {
"type": "string",
"description": "PowerShell command to execute. Must be a valid PowerShell command.",
"required": True
},
"working_directory": {
"type": "string",
"description": "Working directory for command execution. Must be an absolute path.",
"default": None
},
"timeout_seconds": {
"type": "integer",
"description": "Maximum execution time in seconds (1-3600, default: 60)",
"minimum": 1,
"maximum": 3600,
"default": 60
},
"capture_output": {
"type": "boolean",
"description": "Whether to capture and return command output",
"default": True
},
"as_admin": {
"type": "boolean",
"description": "Run command with elevated privileges (requires admin rights)",
"default": False
},
"output_encoding": {
"type": "string",
"description": "Character encoding for command output",
"default": "utf-8"
},
"max_output_size": {
"type": "integer",
"description": "Maximum size of output to capture in bytes",
"default": MAX_OUTPUT_SIZE
}
},
required=["command"],
returns={
"type": "object",
"properties": {
"success": {"type": "boolean"},
"return_code": {"type": "integer"},
"output": {"type": "string"},
"error": {"type": "string"},
"execution_time": {"type": "number"},
"truncated": {"type": "boolean"}
}
},
rate_limit=(POWERSHELL_RATE_LIMIT, 60) # Calls per minute
)
def run_powershell(
command: str,
working_directory: Optional[str] = None,
timeout_seconds: int = 60,
capture_output: bool = True,
as_admin: bool = False,
output_encoding: str = 'utf-8',
max_output_size: int = MAX_OUTPUT_SIZE
) -> Dict[str, Any]:
"""
Execute PowerShell commands with reliable output capture and security checks.
capture_output: Whether to capture and return output (default: True)
as_admin: Run with elevated privileges (requires admin rights, default: False)
output_encoding: Encoding for command output (default: 'utf-8')
max_output_size: Maximum allowed output size in bytes (default: 1MB)
Returns:
Dict containing:
- success (bool): Whether the command executed successfully
- stdout (str, optional): Command output if capture_output is True
- stderr (str, optional): Command error output if capture_output is True
- exit_code (int): Command exit code
- execution_time (float): Execution time in seconds
- error (str, optional): Error message if command failed
- truncated (bool, optional): True if output was truncated
"""
start_time = time.time()
# Validate parameters
is_valid, error_msg, params = _validate_command_execution(
command, working_directory, timeout_seconds
)
if not is_valid:
return {
"success": False,
"error": f"Invalid parameters: {error_msg}",
"execution_time": 0.0
}
# Log the command (without sensitive data)
logger.info(
"powershell_command_started",
command_preview=params['command_preview'],
working_directory=params['working_directory'],
timeout_seconds=params['timeout_seconds'],
capture_output=capture_output,
as_admin=as_admin,
max_output_size=max_output_size
)
# Execute the command
try:
result = CommandExecutor.execute_powershell(
command=params['command'],
working_directory=params['working_directory'],
timeout_seconds=params['timeout_seconds'],
capture_output=capture_output,
output_encoding=output_encoding,
max_output_size=min(max_output_size, MAX_OUTPUT_SIZE * 10) # Allow override up to 10MB
)
# Check for output truncation
if capture_output:
stdout_size = len(result.get("stdout", ""))
stderr_size = len(result.get("stderr", ""))
if stdout_size + stderr_size >= max_output_size:
result["truncated"] = True
result["warning"] = f"Output exceeded {max_output_size} bytes and was truncated"
# Log the result
_log_command_result(
"powershell",
params['command_preview'],
result,
start_time
)
return result
except Exception as e:
error_msg = f"PowerShell execution failed: {str(e)}"
logger.error(
"powershell_command_error",
command_preview=params['command_preview'],
error=error_msg,
exc_info=True
)
return {
"success": False,
"error": error_msg,
"execution_time": time.time() - start_time,
"exit_code": -1
}
@tool(
name="run_cmd",
description="Execute CMD commands with reliable output capture and security checks.",
parameters={
"command": {
"type": "string",
"description": "CMD command to execute. Must be a valid CMD command.",
"required": True
},
"working_directory": {
"type": "string",
"description": "Working directory for command execution. Must be an absolute path.",
"default": None
},
"timeout_seconds": {
"type": "integer",
"description": "Maximum execution time in seconds (1-3600, default: 60)",
"minimum": 1,
"maximum": 3600,
"default": 60
},
"capture_output": {
"type": "boolean",
"description": "Whether to capture and return command output",
"default": True
},
"as_admin": {
"type": "boolean",
"description": "Run command with elevated privileges (requires admin rights)",
"default": False
},
"output_encoding": {
"type": "string",
"description": "Character encoding for command output (default: 'cp1252' for CMD)",
"default": "cp1252"
},
"max_output_size": {
"type": "integer",
"description": "Maximum size of output to capture in bytes (default: 10MB)",
"default": MAX_OUTPUT_SIZE
}
},
required=["command"],
returns={
"type": "object",
"properties": {
"success": {"type": "boolean"},
"return_code": {"type": "integer"},
"output": {"type": "string"},
"error": {"type": "string"},
"execution_time": {"type": "number"},
"truncated": {"type": "boolean"}
}
},
rate_limit=(CMD_RATE_LIMIT, 60) # Calls per minute
)
def run_cmd(
command: str,
working_directory: Optional[str] = None,
timeout_seconds: int = 60,
capture_output: bool = True,
as_admin: bool = False,
output_encoding: str = 'cp1252',
max_output_size: int = MAX_OUTPUT_SIZE
) -> Dict[str, Any]:
"""
Execute CMD commands with reliable output capture and security checks.
- truncated (bool, optional): True if output was truncated
"""
start_time = time.time()
# Validate parameters
is_valid, error_msg, params = _validate_command_execution(
command, working_directory, timeout_seconds
)
if not is_valid:
return {
"success": False,
"error": f"Invalid parameters: {error_msg}",
"execution_time": 0.0
}
# Log the command (without sensitive data)
logger.info(
"cmd_command_started",
command_preview=params['command_preview'],
working_directory=params['working_directory'],
timeout_seconds=params['timeout_seconds'],
capture_output=capture_output,
as_admin=as_admin,
max_output_size=max_output_size
)
# Execute the command
try:
result = CommandExecutor.execute_cmd(
command=params['command'],
working_directory=params['working_directory'],
timeout_seconds=params['timeout_seconds'],
capture_output=capture_output,
output_encoding=output_encoding,
max_output_size=min(max_output_size, MAX_OUTPUT_SIZE * 10) # Allow override up to 10MB
)
# Check for output truncation
if capture_output:
stdout_size = len(result.get("stdout", ""))
stderr_size = len(result.get("stderr", ""))
if stdout_size + stderr_size >= max_output_size:
result["truncated"] = True
result["warning"] = f"Output exceeded {max_output_size} bytes and was truncated"
# Log the result
_log_command_result(
"cmd",
params['command_preview'],
result,
start_time
)
return result
except Exception as e:
error_msg = f"CMD execution failed: {str(e)}"
logger.error(
"cmd_command_error",
command_preview=params['command_preview'],
error=error_msg,
exc_info=True
)
return {
"success": False,
"error": error_msg,
"execution_time": time.time() - start_time,
"exit_code": -1
}
logger.info("powershell_tools_registered", tools=["run_powershell", "run_cmd"])