Skip to main content
Glama

Windows Operations MCP

command_executor.py.backup•17.8 kB
""" Command execution utilities for PowerShell and CMD with support for long-running processes. """ import asyncio import logging import os import platform import signal import subprocess import threading import time from dataclasses import dataclass from datetime import datetime from queue import Queue, Empty from typing import Dict, Any, Optional, Callable, List, Tuple, Union # Import utility functions from the current package from . import create_temp_file, safe_cleanup_file, validate_directory, get_execution_result logger = logging.getLogger(__name__) @dataclass class ProcessOutput: """Container for process output data.""" stdout: str = "" stderr: str = "" exit_code: Optional[int] = None error: Optional[str] = None execution_time: float = 0.0 def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { 'success': self.exit_code == 0 and not self.error, 'stdout': self.stdout, 'stderr': self.stderr, 'exit_code': self.exit_code or 0, # Ensure exit_code is not None 'error': self.error, 'execution_time': self.execution_time, 'command': '', # Will be filled in by the caller if needed 'working_directory': '' # Will be filled in by the caller if needed } class CommandExecutor: """Handles command execution with reliable output capture and streaming support.""" # Default timeouts in seconds DEFAULT_TIMEOUT = 3600 # 1 hour default timeout HEARTBEAT_INTERVAL = 30 # Send heartbeat every 30 seconds @classmethod def _enqueue_output(cls, stream, queue): """Helper to read stream lines into a queue.""" try: for line in iter(stream.readline, b''): if line: queue.put(line) except Exception as e: logger.warning(f"Error reading from stream: {e}") finally: stream.close() @classmethod def _process_output( cls, process: subprocess.Popen, output_callback: Optional[Callable[[str, str], None]] = None, timeout_seconds: int = DEFAULT_TIMEOUT, max_output_size: Optional[int] = None ) -> ProcessOutput: """Process output from a running process with streaming support. Args: process: The running subprocess output_callback: Optional callback function that receives (stream_type, data) timeout_seconds: Maximum execution time in seconds max_output_size: Maximum output size in bytes Returns: ProcessOutput containing the command results """ output = ProcessOutput() start_time = time.time() last_heartbeat = time.time() # Create queues for stdout/stderr stdout_queue = Queue() stderr_queue = Queue() # Start threads to read output stdout_thread = threading.Thread( target=cls._enqueue_output, args=(process.stdout, stdout_queue) ) stderr_thread = threading.Thread( target=cls._enqueue_output, args=(process.stderr, stderr_queue) ) # Set threads as daemon so they'll exit when the main thread exits stdout_thread.daemon = True stderr_thread.daemon = True stdout_thread.start() stderr_thread.start() try: while True: # Check if process has finished if process.poll() is not None: output.exit_code = process.returncode break # Check for timeout elapsed = time.time() - start_time if elapsed > timeout_seconds: logger.warning(f"Command timed out after {timeout_seconds} seconds") output.error = f"Command timed out after {timeout_seconds} seconds" output.exit_code = -1 process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() break # Check for new output got_output = False # Process stdout try: while True: line = stdout_queue.get_nowait() line = line.decode('utf-8', errors='replace').rstrip() output.stdout += line + '\n' if output_callback: output_callback('stdout', line) got_output = True except Empty: pass # Process stderr try: while True: line = stderr_queue.get_nowait() line = line.decode('utf-8', errors='replace').rstrip() output.stderr += line + '\n' if output_callback: output_callback('stderr', line) got_output = True except Empty: pass # Send heartbeat if needed current_time = time.time() if current_time - last_heartbeat >= cls.HEARTBEAT_INTERVAL: if output_callback: output_callback('heartbeat', f'Still running... ({elapsed:.1f}s)') last_heartbeat = current_time # If we didn't get any output, sleep briefly to avoid busy-waiting if not got_output: time.sleep(0.1) # Check output size limits if max_output_size and len(output.stdout) + len(output.stderr) > max_output_size: output.error = "Output size limit exceeded" output.exit_code = -1 process.terminate() break # Ensure threads are done stdout_thread.join(timeout=1) stderr_thread.join(timeout=1) # Get any remaining output try: remaining_stdout = process.stdout.read().decode('utf-8', errors='replace') remaining_stderr = process.stderr.read().decode('utf-8', errors='replace') output.stdout += remaining_stdout output.stderr += remaining_stderr except Exception as e: logger.warning(f"Error reading final output: {e}") # Set final execution time output.execution_time = time.time() - start_time return output except Exception as e: logger.error(f"Error processing output: {e}", exc_info=True) output.error = str(e) output.exit_code = -1 return output finally: # Ensure process is terminated try: if process.poll() is None: process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() except Exception as e: logger.warning(f"Error terminating process: {e}") @classmethod def execute_powershell( cls, command: str, working_directory: Optional[str] = None, timeout_seconds: int = None, capture_output: bool = True, output_encoding: str = 'utf-8', max_output_size: Optional[int] = 10 * 1024 * 1024, # 10MB default output_callback: Optional[Callable[[str, str], None]] = None, **kwargs ) -> Dict[str, Any]: """ Execute PowerShell commands with streaming output support. Args: command: The PowerShell command to execute working_directory: Working directory for the command timeout_seconds: Maximum execution time in seconds (default: 1 hour) capture_output: Whether to capture command output output_encoding: Output encoding (default: utf-8) max_output_size: Maximum output size in bytes (default: 10MB) output_callback: Optional callback function that receives (stream_type, data) **kwargs: Additional arguments (for backward compatibility) Returns: Dictionary containing command execution results """ start_time = time.time() # Use default timeout if not specified if timeout_seconds is None: timeout_seconds = cls.DEFAULT_TIMEOUT try: logger.info(f"Executing PowerShell: {command[:100]}...") # Validate and set working directory current_dir = os.getcwd() if working_directory: dir_validation = validate_directory(working_directory) if not dir_validation["valid"]: return get_execution_result( success=False, command=command, execution_time=time.time() - start_time, error=dir_validation["error"] ) current_dir = working_directory if not capture_output: # Execute without capturing output process = subprocess.Popen( ["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command], cwd=current_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if platform.system() == 'Windows' else 0 ) # Just wait for process to complete try: process.wait(timeout=timeout_seconds) exit_code = process.returncode except subprocess.TimeoutExpired: process.terminate() return get_execution_result( success=False, command=command, execution_time=time.time() - start_time, error=f"Command timed out after {timeout_seconds} seconds" ) return get_execution_result( success=exit_code == 0, command=command, exit_code=exit_code, execution_time=time.time() - start_time, working_directory=current_dir ) # Execute with output capture and streaming process = subprocess.Popen( ["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command], cwd=current_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if platform.system() == 'Windows' else 0 ) # Process output with our streaming handler output = cls._process_output( process=process, output_callback=output_callback, timeout_seconds=timeout_seconds, max_output_size=max_output_size ) # Convert to the expected result format result = output.to_dict() result.update({ 'command': command, 'working_directory': current_dir }) return result except Exception as e: error_msg = f"PowerShell execution failed: {str(e)}" logger.error(error_msg, exc_info=True) return get_execution_result( success=False, command=command, execution_time=time.time() - start_time, error=error_msg ) finally: # Restore original working directory if it was changed if working_directory and os.getcwd() != current_dir: os.chdir(current_dir) @classmethod def execute_cmd( cls, command: str, working_directory: Optional[str] = None, timeout_seconds: int = None, capture_output: bool = True, output_encoding: str = 'utf-8', max_output_size: Optional[int] = 10 * 1024 * 1024, # 10MB default output_callback: Optional[Callable[[str, str], None]] = None, **kwargs ) -> Dict[str, Any]: """ Execute CMD commands with streaming output support. Args: command: The command to execute working_directory: Working directory for the command timeout_seconds: Maximum execution time in seconds (default: 1 hour) capture_output: Whether to capture command output output_encoding: Output encoding (default: utf-8) max_output_size: Maximum output size in bytes (default: 10MB) output_callback: Optional callback function that receives (stream_type, data) **kwargs: Additional arguments (for backward compatibility) Returns: Dictionary containing command execution results """ start_time = time.time() # Use default timeout if not specified if timeout_seconds is None: timeout_seconds = cls.DEFAULT_TIMEOUT try: logger.info(f"Executing CMD: {command[:100]}...") # Validate and set working directory current_dir = os.getcwd() if working_directory: dir_validation = validate_directory(working_directory) if not dir_validation["valid"]: return get_execution_result( success=False, command=command, execution_time=time.time() - start_time, error=dir_validation["error"] ) current_dir = working_directory if not capture_output: # Execute without capturing output process = subprocess.Popen( ["cmd.exe", "/c", command], cwd=current_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if platform.system() == 'Windows' else 0, shell=True ) # Just wait for process to complete try: process.wait(timeout=timeout_seconds) exit_code = process.returncode except subprocess.TimeoutExpired: process.terminate() return get_execution_result( success=False, command=command, execution_time=time.time() - start_time, error=f"Command timed out after {timeout_seconds} seconds" ) return get_execution_result( success=exit_code == 0, command=command, exit_code=exit_code, execution_time=time.time() - start_time, working_directory=current_dir ) # Execute with output capture and streaming process = subprocess.Popen( ["cmd.exe", "/c", command], cwd=current_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if platform.system() == 'Windows' else 0, shell=True ) # Process output with our streaming handler output = cls._process_output( process=process, output_callback=output_callback, timeout_seconds=timeout_seconds, max_output_size=max_output_size ) # Convert to the expected result format result = output.to_dict() result.update({ 'command': command, 'working_directory': current_dir }) return result except Exception as e: error_msg = f"Command execution failed: {str(e)}" logger.error(error_msg, exc_info=True) return get_execution_result( success=False, command=command, execution_time=time.time() - start_time, error=error_msg ) finally: # Restore original working directory if it was changed if working_directory and os.getcwd() != current_dir: os.chdir(current_dir)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/windows-operations-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server