Skip to main content
Glama

Windows Operations MCP

powershell_tools.py.backup•19.2 kB
""" PowerShell and CMD execution tools for Windows Operations MCP. This module provides tools for executing PowerShell and CMD commands with structured logging, rate limiting, and security features. Features: - Secure command execution with input validation - Rate limiting to prevent abuse - Structured logging for auditability - Safe command execution context - Support for both PowerShell and CMD commands - Configurable timeouts and output limits """ from typing import Dict, Any, Optional, List, Tuple, Union, Pattern from pathlib import Path import re import time import os import logging from functools import wraps from ..decorators import tool, validate_inputs, rate_limited, log_execution from ..logging_config import get_logger # Import CommandExecutor from the correct module from windows_operations_mcp.utils.command_executor import CommandExecutor # Get a structured logger logger = get_logger(__name__) # Constants for rate limiting and security POWERSHELL_RATE_LIMIT = 10 # Max calls per minute CMD_RATE_LIMIT = 15 # Max calls per minute (slightly higher for CMD) MAX_COMMAND_LENGTH = 8192 # Maximum allowed command length MAX_OUTPUT_SIZE = 1024 * 1024 # 1MB max output size # Regular expressions for command validation DANGEROUS_PATTERNS = [ # Command chaining and separation (r'[;&]', 'Command chaining'), (r'\|\s*;', 'Command chaining with pipe'), (r'`', 'Backtick command substitution'), # Variable and command substitution (r'\$\s*\{', 'Variable expansion'), (r'\$\s*\(', 'Command substitution'), (r'`?\$\s*\w+\s*=\s*\[', 'Variable assignment with expression'), # Pipeline and redirection (r'\|\s*\|', 'Pipeline chaining'), (r'&\s*&', 'Background process chaining'), (r'\|\s*&', 'Pipeline to background process'), (r'\|\s*[<>]', 'Pipeline with redirection'), (r'[<>]\s*\|', 'Redirection with pipe'), # Method and property access (r'\[\w+\]::', 'Static method call'), (r'\.\s*\w+\s*\(' , 'Method call'), (r'\.\s*\w+\s*\[', 'Indexer access'), (r'\w+\s*\[', 'Array/hash access'), # Dangerous commands and patterns (r'Invoke-Expression', 'Invoke-Expression command'), (r'Start-Process\s+-NoNewWindow', 'Start-Process with NoNewWindow'), (r'Remove-Item\s+.*-Recurse', 'Recursive file removal'), (r'rm\s+-r', 'Recursive remove (Unix-style)'), (r'del\s+/s', 'Recursive delete (CMD)'), (r'\[System\.IO\.File\]', 'Direct file system access'), (r'\[System\.IO\.Directory\]', 'Direct directory access'), (r'\[System\.Net\.WebClient\]', 'Web client access'), (r'\[System\.Net\.WebRequest\]', 'Web request access'), (r'\[System\.Reflection\..*\]', 'Reflection access'), (r'\[System\.Management\.Automation\..*\]', 'PowerShell internals access') ] def validate_command_safety(command: str) -> Tuple[bool, str]: """ Validate that a command doesn't contain potentially dangerous patterns. Args: command: The command to validate Returns: Tuple[bool, str]: (is_safe, reason) - True and empty string if safe, False and reason if not safe """ # Check command length if not command or not isinstance(command, str): return False, "Command is empty or not a string" if len(command) > MAX_COMMAND_LENGTH: return False, f"Command exceeds maximum length of {MAX_COMMAND_LENGTH} characters" # Check for dangerous patterns for pattern, description in DANGEROUS_PATTERNS: if re.search(pattern, command, re.IGNORECASE): return False, f"Potentially dangerous pattern detected: {description}" return True, "" def validate_working_directory(path: Optional[str]) -> Tuple[bool, str]: """ Validate that a working directory exists and is accessible. Args: path: Path to validate Returns: Tuple[bool, str]: (is_valid, error_message) - True and empty string if valid, False and error message if invalid """ if not path: return True, "" try: # Check if path is absolute if not os.path.isabs(path): return False, f"Path must be absolute: {path}" # Normalize path to prevent directory traversal normalized_path = os.path.normpath(path) if '..' in normalized_path.split(os.sep): return False, f"Path contains parent directory traversal: {path}" # Check if path exists and is a directory if not os.path.exists(normalized_path): return False, f"Directory does not exist: {normalized_path}" if not os.path.isdir(normalized_path): return False, f"Path is not a directory: {normalized_path}" # Check if we have read and execute permissions if not os.access(normalized_path, os.R_OK | os.X_OK): return False, f"Insufficient permissions for directory: {normalized_path}" return True, "" except (TypeError, ValueError, OSError) as e: return False, f"Invalid path '{path}': {str(e)}" def _validate_command_execution( command: str, working_directory: Optional[str], timeout_seconds: int ) -> Tuple[bool, str, Dict[str, Any]]: """ Validate common command execution parameters. Args: command: Command to execute working_directory: Working directory for command execution timeout_seconds: Command timeout in seconds Returns: Tuple of (is_valid, error_message, sanitized_params) """ # Validate command if not command or not isinstance(command, str) or not command.strip(): return False, "Command cannot be empty", {} # Validate command safety is_safe, safety_reason = validate_command_safety(command) if not is_safe: return False, safety_reason, {} # Validate working directory if working_directory: is_valid, dir_error = validate_working_directory(working_directory) if not is_valid: return False, dir_error, {} # Validate timeout if not isinstance(timeout_seconds, (int, float)) or timeout_seconds <= 0: return False, "Timeout must be a positive number", {} # Sanitize parameters sanitized = { 'command': command, 'working_directory': working_directory or os.getcwd(), 'timeout_seconds': min(int(timeout_seconds), 3600), # Max 1 hour timeout 'command_preview': command[:100] + ('...' if len(command) > 100 else '') } return True, "", sanitized def _log_command_result( command_type: str, command_preview: str, result: Dict[str, Any], start_time: float ) -> None: """ Log command execution result with structured logging. Args: command_type: Type of command ('powershell' or 'cmd') command_preview: Preview of the command (truncated if needed) result: Command execution result dictionary start_time: Start time of command execution (from time.time()) """ log_data = { "command_type": command_type, "command_preview": command_preview, "success": result.get("success", False), "exit_code": result.get("exit_code", -1), "execution_time_ms": round((time.time() - start_time) * 1000, 2), "output_size": len(result.get("stdout", "")) + len(result.get("stderr", "")) } if result.get("success", False): logger.info( f"{command_type}_command_completed", **log_data ) else: logger.error( f"{command_type}_command_failed", error=result.get("error", "Unknown error"), **log_data ) def register_powershell_tools(mcp): """ Register PowerShell and CMD tools with FastMCP. Args: mcp: The FastMCP instance to register tools with """ @tool( name="run_powershell", description="Execute PowerShell commands with reliable output capture and security checks.", parameters={ "command": { "type": "string", "description": "PowerShell command to execute. Must be a valid PowerShell command.", "required": True }, "working_directory": { "type": "string", "description": "Working directory for command execution. Must be an absolute path.", "default": None }, "timeout_seconds": { "type": "integer", "description": "Maximum execution time in seconds (1-3600, default: 60)", "minimum": 1, "maximum": 3600, "default": 60 }, "capture_output": { "type": "boolean", "description": "Whether to capture and return command output", "default": True }, "as_admin": { "type": "boolean", "description": "Run command with elevated privileges (requires admin rights)", "default": False }, "output_encoding": { "type": "string", "description": "Character encoding for command output", "default": "utf-8" }, "max_output_size": { "type": "integer", "description": "Maximum size of output to capture in bytes", "default": MAX_OUTPUT_SIZE } }, required=["command"], returns={ "type": "object", "properties": { "success": {"type": "boolean"}, "return_code": {"type": "integer"}, "output": {"type": "string"}, "error": {"type": "string"}, "execution_time": {"type": "number"}, "truncated": {"type": "boolean"} } }, rate_limit=(POWERSHELL_RATE_LIMIT, 60) # Calls per minute ) def run_powershell( command: str, working_directory: Optional[str] = None, timeout_seconds: int = 60, capture_output: bool = True, as_admin: bool = False, output_encoding: str = 'utf-8', max_output_size: int = MAX_OUTPUT_SIZE ) -> Dict[str, Any]: """ Execute PowerShell commands with reliable output capture and security checks. capture_output: Whether to capture and return output (default: True) as_admin: Run with elevated privileges (requires admin rights, default: False) output_encoding: Encoding for command output (default: 'utf-8') max_output_size: Maximum allowed output size in bytes (default: 1MB) Returns: Dict containing: - success (bool): Whether the command executed successfully - stdout (str, optional): Command output if capture_output is True - stderr (str, optional): Command error output if capture_output is True - exit_code (int): Command exit code - execution_time (float): Execution time in seconds - error (str, optional): Error message if command failed - truncated (bool, optional): True if output was truncated """ start_time = time.time() # Validate parameters is_valid, error_msg, params = _validate_command_execution( command, working_directory, timeout_seconds ) if not is_valid: return { "success": False, "error": f"Invalid parameters: {error_msg}", "execution_time": 0.0 } # Log the command (without sensitive data) logger.info( "powershell_command_started", command_preview=params['command_preview'], working_directory=params['working_directory'], timeout_seconds=params['timeout_seconds'], capture_output=capture_output, as_admin=as_admin, max_output_size=max_output_size ) # Execute the command try: result = CommandExecutor.execute_powershell( command=params['command'], working_directory=params['working_directory'], timeout_seconds=params['timeout_seconds'], capture_output=capture_output, output_encoding=output_encoding, max_output_size=min(max_output_size, MAX_OUTPUT_SIZE * 10) # Allow override up to 10MB ) # Check for output truncation if capture_output: stdout_size = len(result.get("stdout", "")) stderr_size = len(result.get("stderr", "")) if stdout_size + stderr_size >= max_output_size: result["truncated"] = True result["warning"] = f"Output exceeded {max_output_size} bytes and was truncated" # Log the result _log_command_result( "powershell", params['command_preview'], result, start_time ) return result except Exception as e: error_msg = f"PowerShell execution failed: {str(e)}" logger.error( "powershell_command_error", command_preview=params['command_preview'], error=error_msg, exc_info=True ) return { "success": False, "error": error_msg, "execution_time": time.time() - start_time, "exit_code": -1 } @tool( name="run_cmd", description="Execute CMD commands with reliable output capture and security checks.", parameters={ "command": { "type": "string", "description": "CMD command to execute. Must be a valid CMD command.", "required": True }, "working_directory": { "type": "string", "description": "Working directory for command execution. Must be an absolute path.", "default": None }, "timeout_seconds": { "type": "integer", "description": "Maximum execution time in seconds (1-3600, default: 60)", "minimum": 1, "maximum": 3600, "default": 60 }, "capture_output": { "type": "boolean", "description": "Whether to capture and return command output", "default": True }, "as_admin": { "type": "boolean", "description": "Run command with elevated privileges (requires admin rights)", "default": False }, "output_encoding": { "type": "string", "description": "Character encoding for command output (default: 'cp1252' for CMD)", "default": "cp1252" }, "max_output_size": { "type": "integer", "description": "Maximum size of output to capture in bytes (default: 10MB)", "default": MAX_OUTPUT_SIZE } }, required=["command"], returns={ "type": "object", "properties": { "success": {"type": "boolean"}, "return_code": {"type": "integer"}, "output": {"type": "string"}, "error": {"type": "string"}, "execution_time": {"type": "number"}, "truncated": {"type": "boolean"} } }, rate_limit=(CMD_RATE_LIMIT, 60) # Calls per minute ) def run_cmd( command: str, working_directory: Optional[str] = None, timeout_seconds: int = 60, capture_output: bool = True, as_admin: bool = False, output_encoding: str = 'cp1252', max_output_size: int = MAX_OUTPUT_SIZE ) -> Dict[str, Any]: """ Execute CMD commands with reliable output capture and security checks. - truncated (bool, optional): True if output was truncated """ start_time = time.time() # Validate parameters is_valid, error_msg, params = _validate_command_execution( command, working_directory, timeout_seconds ) if not is_valid: return { "success": False, "error": f"Invalid parameters: {error_msg}", "execution_time": 0.0 } # Log the command (without sensitive data) logger.info( "cmd_command_started", command_preview=params['command_preview'], working_directory=params['working_directory'], timeout_seconds=params['timeout_seconds'], capture_output=capture_output, as_admin=as_admin, max_output_size=max_output_size ) # Execute the command try: result = CommandExecutor.execute_cmd( command=params['command'], working_directory=params['working_directory'], timeout_seconds=params['timeout_seconds'], capture_output=capture_output, output_encoding=output_encoding, max_output_size=min(max_output_size, MAX_OUTPUT_SIZE * 10) # Allow override up to 10MB ) # Check for output truncation if capture_output: stdout_size = len(result.get("stdout", "")) stderr_size = len(result.get("stderr", "")) if stdout_size + stderr_size >= max_output_size: result["truncated"] = True result["warning"] = f"Output exceeded {max_output_size} bytes and was truncated" # Log the result _log_command_result( "cmd", params['command_preview'], result, start_time ) return result except Exception as e: error_msg = f"CMD execution failed: {str(e)}" logger.error( "cmd_command_error", command_preview=params['command_preview'], error=error_msg, exc_info=True ) return { "success": False, "error": error_msg, "execution_time": time.time() - start_time, "exit_code": -1 } logger.info("powershell_tools_registered", tools=["run_powershell", "run_cmd"])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/windows-operations-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server