Skip to main content
Glama

Windows Operations MCP

process_tools.py.backup•27.6 kB
""" Process monitoring and management tools for Windows Operations MCP. This module provides tools for process monitoring and management with structured logging, input validation, and security checks. """ import platform import time from typing import Dict, Any, List, Optional, Tuple # Check if psutil is available try: import psutil from psutil import NoSuchProcess, AccessDenied, ZombieProcess HAS_PSUTIL = True except ImportError: HAS_PSUTIL = False from ..logging_config import get_logger from ..decorators import tool_decorator, rate_limited, validate_inputs # Initialize structured logger logger = get_logger(__name__) # Constants MAX_PROCESSES = 1000 # Maximum number of processes to return in list DEFAULT_MAX_PROCESSES = 100 SYSTEM_PROCESSES = { 'System', 'Registry', 'csrss.exe', 'winlogon.exe', 'services.exe', 'lsass.exe', 'smss.exe', 'wininit.exe', 'svchost.exe', 'dwm.exe', 'taskhostw.exe' } def validate_process_inputs( filter_name: Optional[str] = None, include_system: bool = False, max_processes: int = DEFAULT_MAX_PROCESSES, pid: Optional[int] = None ) -> Tuple[bool, Optional[str]]: """ Validate process tool input parameters. Args: filter_name: Process name filter string include_system: Whether to include system processes max_processes: Maximum number of processes to return pid: Process ID to validate (if applicable) Returns: Tuple of (is_valid, error_message) """ # Validate filter_name if provided if filter_name is not None and not isinstance(filter_name, str): return False, "Filter name must be a string or None" # Validate max_processes if not isinstance(max_processes, int) or not (1 <= max_processes <= MAX_PROCESSES): return False, f"Max processes must be an integer between 1 and {MAX_PROCESSES}" # Validate pid if provided if pid is not None: if not isinstance(pid, int) or pid <= 0: return False, "Process ID must be a positive integer" return True, None def get_process_basic_info(proc: 'psutil.Process') -> Dict[str, Any]: """ Get basic information about a process. Args: proc: psutil.Process object Returns: Dict containing basic process information """ try: with proc.oneshot(): return { "pid": proc.pid, "name": proc.name(), "username": proc.username() or "N/A", "status": proc.status(), "memory_mb": round(proc.memory_info().rss / 1024 / 1024, 2), "cpu_percent": proc.cpu_percent(interval=0.1) or 0.0, "create_time": proc.create_time(), "exe": proc.exe() if proc.exe() else "N/A", "cmdline": proc.cmdline(), "parent_pid": proc.ppid() } except (NoSuchProcess, AccessDenied, ZombieProcess) as e: logger.warning("Failed to get basic process info", pid=proc.pid, error=str(e)) return {"error": f"Failed to get process info: {e}"} def _get_cpu_info() -> Dict[str, Any]: """ Get detailed CPU information. Returns: Dict containing CPU information """ try: cpu_percent = psutil.cpu_percent(interval=0.5, percpu=True) cpu_count = psutil.cpu_count(logical=False) cpu_count_logical = psutil.cpu_count(logical=True) cpu_times = psutil.cpu_times_percent(interval=0.5) cpu_info = { "physical_cores": cpu_count, "logical_cores": cpu_count_logical, "percent": cpu_percent, "avg_percent": sum(cpu_percent) / len(cpu_percent) if cpu_percent else 0, "times": { "user": cpu_times.user, "system": cpu_times.system, "idle": cpu_times.idle, "iowait": getattr(cpu_times, 'iowait', None), "irq": getattr(cpu_times, 'irq', None), "steal": getattr(cpu_times, 'steal', None) } } # Add CPU frequency if available try: cpu_freq = psutil.cpu_freq() if cpu_freq: cpu_info["freq"] = { "current_mhz": cpu_freq.current, "min_mhz": cpu_freq.min, "max_mhz": cpu_freq.max } except Exception as e: logger.warning("Failed to get CPU frequency info", error=str(e)) cpu_info["freq"] = None # Add CPU stats if available try: cpu_stats = psutil.cpu_stats() cpu_info["stats"] = { "ctx_switches": getattr(cpu_stats, 'ctx_switches', None), "interrupts": getattr(cpu_stats, 'interrupts', None), "soft_interrupts": getattr(cpu_stats, 'soft_interrupts', None), "syscalls": getattr(cpu_stats, 'syscalls', None) } except Exception as e: logger.warning("Failed to get CPU stats", error=str(e)) cpu_info["stats"] = None return cpu_info except Exception as e: logger.error("Failed to get CPU information", error=str(e)) return {"error": f"Failed to get CPU information: {str(e)}"} def _get_memory_info() -> Dict[str, Any]: """ Get detailed memory and swap information. Returns: Dict containing memory and swap information """ try: memory = psutil.virtual_memory() swap = psutil.swap_memory() return { "virtual": { "total_gb": round(memory.total / (1024 ** 3), 2), "available_gb": round(memory.available / (1024 ** 3), 2), "used_gb": round(memory.used / (1024 ** 3), 2), "free_gb": round(memory.free / (1024 ** 3), 2), "percent": memory.percent, "active_gb": round(getattr(memory, 'active', 0) / (1024 ** 3), 2) if hasattr(memory, 'active') else None, "inactive_gb": round(getattr(memory, 'inactive', 0) / (1024 ** 3), 2) if hasattr(memory, 'inactive') else None }, "swap": { "total_gb": round(swap.total / (1024 ** 3), 2), "used_gb": round(swap.used / (1024 ** 3), 2), "free_gb": round(swap.free / (1024 ** 3), 2), "percent": swap.percent, "sin_gb": round(swap.sin / (1024 ** 3), 2) if hasattr(swap, 'sin') else None, "sout_gb": round(swap.sout / (1024 ** 3), 2) if hasattr(swap, 'sout') else None } } except Exception as e: logger.error("Failed to get memory information", error=str(e)) return {"error": f"Failed to get memory information: {str(e)}"} def _get_disk_info() -> Dict[str, Any]: """ Get detailed disk and partition information. Returns: Dict containing disk and partition information """ disk_info = {"partitions": [], "io": {}} try: # Get disk partitions for part in psutil.disk_partitions(all=False): # all=False skips pseudo-filesystems try: usage = psutil.disk_usage(part.mountpoint) disk_info["partitions"].append({ "device": part.device, "mountpoint": part.mountpoint, "fstype": part.fstype, "opts": part.opts, "total_gb": round(usage.total / (1024 ** 3), 2), "used_gb": round(usage.used / (1024 ** 3), 2), "free_gb": round(usage.free / (1024 ** 3), 2), "percent_used": usage.percent }) except Exception as e: logger.warning("Failed to get disk usage", mountpoint=part.mountpoint, error=str(e)) # Get disk I/O counters try: disk_io = psutil.disk_io_counters(perdisk=False) if disk_io: disk_info["io"] = { "read_count": disk_io.read_count, "write_count": disk_io.write_count, "read_mb": round(disk_io.read_bytes / (1024 ** 2), 2), "write_mb": round(disk_io.write_bytes / (1024 ** 2), 2), "read_time_ms": disk_io.read_time, "write_time_ms": disk_io.write_time, "busy_time_ms": getattr(disk_io, 'busy_time', None) } except Exception as e: logger.warning("Failed to get disk I/O counters", error=str(e)) except Exception as e: logger.error("Failed to get disk information", error=str(e)) return {"error": f"Failed to get disk information: {str(e)}"} return disk_info def _get_network_info() -> Dict[str, Any]: """ Get detailed network interface and I/O information. Returns: Dict containing network information """ net_info = {"io": {}, "interfaces": {}} try: # Get network I/O counters try: net_io = psutil.net_io_counters(pernic=False) if net_io: net_info["io"] = { "bytes_sent_mb": round(net_io.bytes_sent / (1024 ** 2), 2), "bytes_recv_mb": round(net_io.bytes_recv / (1024 ** 2), 2), "packets_sent": net_io.packets_sent, "packets_recv": net_io.packets_recv, "errin": net_io.errin, "errout": net_io.errout, "dropin": net_io.dropin, "dropout": net_io.dropout } except Exception as e: logger.warning("Failed to get network I/O counters", error=str(e)) # Get network interface details try: net_addrs = psutil.net_if_addrs() net_stats = psutil.net_if_stats() for name, addrs in net_addrs.items(): try: stats = net_stats.get(name, {}) net_info["interfaces"][name] = { "addresses": [ { "family": str(addr.family), "address": addr.address, "netmask": addr.netmask, "broadcast": addr.broadcast, "ptp": addr.ptp } for addr in addrs if addr.address ], "stats": stats._asdict() if hasattr(stats, '_asdict') else str(stats) } except Exception as e: logger.warning("Failed to get network interface info", interface=name, error=str(e)) net_info["interfaces"][name] = {"error": str(e)} except Exception as e: logger.warning("Failed to get network interface details", error=str(e)) except Exception as e: logger.error("Failed to get network information", error=str(e)) return {"error": f"Failed to get network information: {str(e)}"} return net_info def register_process_tools(mcp): """ Register process monitoring tools with FastMCP. Args: mcp: The FastMCP instance to register tools with """ @mcp.tool() @tool_decorator @rate_limited(max_calls=15, time_window=60) # 15 calls per minute @validate_inputs( lambda filter_name, **_: validate_process_inputs(filter_name=filter_name)[0], lambda include_system, **_: validate_process_inputs(include_system=include_system)[0], lambda max_processes, **_: validate_process_inputs(max_processes=max_processes)[0] ) def get_process_list( filter_name: Optional[str] = None, include_system: bool = False, max_processes: int = DEFAULT_MAX_PROCESSES ) -> Dict[str, Any]: """ Get list of running processes with filtering options. Args: filter_name: Filter processes by name (case-insensitive partial match) include_system: Include system processes (default: False) max_processes: Maximum number of processes to return (1-1000) Returns: Dict containing: - success (bool): Whether the operation succeeded - processes (list): List of process information dictionaries - total_count (int): Total number of processes returned - system_info (dict): System information snapshot - error (str, optional): Error message if success is False """ # Log the operation logger.info( "get_process_list_started", filter_name=filter_name, include_system=include_system, max_processes=max_processes ) # Validate inputs is_valid, error_msg = validate_process_inputs( filter_name=filter_name, include_system=include_system, max_processes=max_processes ) if not is_valid: logger.error("get_process_list_validation_failed", error=error_msg) return { "success": False, "error": f"Invalid input: {error_msg}", "processes": [], "total_count": 0 } if not HAS_PSUTIL: error_msg = "psutil not available. Install with: pip install psutil" logger.error("get_process_list_dependency_missing", error=error_msg) return { "success": False, "error": error_msg, "note": "Process monitoring requires psutil package", "processes": [], "total_count": 0 } processes = [] process_count = 0 start_time = time.time() try: # Get system info snapshot sys_info = { "platform": platform.platform(), "cpu_count": psutil.cpu_count(), "cpu_percent": psutil.cpu_percent(interval=0.1), "memory_percent": psutil.virtual_memory().percent, "timestamp": time.time() } # Iterate through processes for proc in psutil.process_iter(['pid', 'name']): try: # Skip if we've reached the maximum number of processes if len(processes) >= max_processes: break # Get process info pinfo = get_process_basic_info(proc) # Skip if we couldn't get process info if "error" in pinfo: continue # Apply filters if filter_name and filter_name.lower() not in pinfo['name'].lower(): continue if not include_system and pinfo['name'] in SYSTEM_PROCESSES: continue # Add to results processes.append(pinfo) process_count += 1 except (NoSuchProcess, AccessDenied, ZombieProcess): # Process might have died between listing and info gathering continue # Sort by memory usage (descending) processes.sort(key=lambda x: x.get('memory_mb', 0), reverse=True) # Calculate execution time execution_time = (time.time() - start_time) * 1000 # ms # Prepare result result = { "success": True, "processes": processes, "total_count": len(processes), "system_info": sys_info, "execution_time_ms": round(execution_time, 2), "filter_applied": filter_name, "include_system": include_system } # Log completion logger.info( "get_process_list_completed", process_count=len(processes), execution_time_ms=round(execution_time, 2) ) return result except Exception as e: error_msg = f"Unexpected error getting process list: {str(e)}" logger.error( "get_process_list_error", error=error_msg, exc_info=True ) return { "success": False, "error": error_msg, "processes": [], "total_count": 0 } @mcp.tool() @tool_decorator @rate_limited(max_calls=20, time_window=60) # 20 calls per minute @validate_inputs( lambda pid, **_: validate_process_inputs(pid=pid)[0] ) def get_process_info(pid: int) -> Dict[str, Any]: """ Get detailed information about a specific process. Args: pid: Process ID to query (must be a positive integer) Returns: Dict containing: - success (bool): Whether the operation succeeded - process (dict): Detailed process information if successful - error (str, optional): Error message if success is False """ # Log the operation logger.info("get_process_info_started", pid=pid) # Validate inputs is_valid, error_msg = validate_process_inputs(pid=pid) if not is_valid: logger.error("get_process_info_validation_failed", pid=pid, error=error_msg) return { "success": False, "error": f"Invalid process ID: {error_msg}" } if not HAS_PSUTIL: error_msg = "psutil not available. Install with: pip install psutil" logger.error("get_process_info_dependency_missing", error=error_msg) return { "success": False, "error": error_msg, "note": "Process monitoring requires psutil package" } start_time = time.time() try: proc = psutil.Process(pid) # Gather comprehensive process info with proc.oneshot(): # Get basic process info process_info = get_process_basic_info(proc) if "error" in process_info: logger.warning( "get_process_info_basic_failed", pid=pid, error=process_info["error"] ) return {"success": False, "error": process_info["error"]} # Add additional detailed information try: # Get memory info mem_info = proc.memory_full_info() process_info.update({ "memory": { "rss_mb": round(mem_info.rss / 1024 / 1024, 2), "vms_mb": round(mem_info.vms / 1024 / 1024, 2), "percent": round(proc.memory_percent(), 2), "uss_mb": round(mem_info.uss / 1024 / 1024, 2) if hasattr(mem_info, 'uss') else None, "pss_mb": round(mem_info.pss / 1024 / 1024, 2) if hasattr(mem_info, 'pss') else None, "swap_mb": round(mem_info.swap / 1024 / 1024, 2) if hasattr(mem_info, 'swap') else None }, "cpu": { "percent": proc.cpu_percent(interval=0.1), "num_ctx_switches": proc.num_ctx_switches()._asdict() if proc.num_ctx_switches() else {}, "cpu_num": proc.cpu_num(), "cpu_affinity": proc.cpu_affinity() }, "io_counters": proc.io_counters()._asdict() if proc.io_counters() else {}, "num_handles": proc.num_handles() if hasattr(proc, 'num_handles') else None, "num_threads": proc.num_threads(), "threads": [ {"id": t.id, "user_time": t.user_time, "system_time": t.system_time} for t in proc.threads() ] if proc.threads() else [] }) # Get environment variables if available try: process_info["environ"] = proc.environ() except (AccessDenied, AttributeError): process_info["environ"] = "Access denied or not available" # Get open files if available try: open_files = proc.open_files() process_info["open_files"] = [ {"path": f.path, "fd": f.fd} for f in open_files[:50] # Limit to first 50 files ] if open_files else [] except (AccessDenied, AttributeError): process_info["open_files"] = "Access denied or not available" # Get network connections if available try: connections = proc.connections() process_info["connections"] = [ { "fd": conn.fd, "family": str(conn.family), "type": str(conn.type), "local_address": f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "N/A", "remote_address": f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else "N/A", "status": conn.status } for conn in connections[:20] # Limit to first 20 connections ] except (AccessDenied, AttributeError): process_info["connections"] = "Access denied or not available" except (NoSuchProcess, AccessDenied, ZombieProcess) as e: logger.warning("Failed to get detailed process info", pid=pid, error=str(e)) # Continue with basic info if detailed info fails # Add execution time process_info["execution_time_ms"] = round((time.time() - start_time) * 1000, 2) # Log completion logger.info( "get_process_info_completed", pid=pid, execution_time_ms=process_info["execution_time_ms"] ) return {"success": True, "process": process_info} except NoSuchProcess: error_msg = f"Process with PID {pid} not found" logger.error("get_process_info_not_found", pid=pid, error=error_msg) return { "success": False, "error": error_msg } except AccessDenied: error_msg = f"Access denied for process PID {pid} (elevated privileges may be required)" logger.error("get_process_info_access_denied", pid=pid, error=error_msg) return { "success": False, "error": error_msg } except Exception as e: error_msg = f"Unexpected error getting process info: {str(e)}" logger.error( "get_process_info_error", pid=pid, error=error_msg, exc_info=True ) return { "success": False, "error": error_msg } @mcp.tool() @tool_decorator @rate_limited(max_calls=10, time_window=60) # 10 calls per minute def get_system_resources() -> Dict[str, Any]: """ Get comprehensive system resource usage information. Returns: Dict containing: - success (bool): Whether the operation succeeded - system (dict): System resource information - error (str, optional): Error message if success is False """ # Log the operation logger.info("get_system_resources_started") start_time = time.time() if not HAS_PSUTIL: error_msg = "psutil not available. Install with: pip install psutil" logger.error("get_system_resources_dependency_missing", error=error_msg) return { "success": False, "error": error_msg, "note": "System monitoring requires psutil package" } result = {"success": True, "system": {}} try: # System information system_info = { "platform": platform.platform(), "system": platform.system(), "node": platform.node(), "release": platform.release(), "version": platform.version(), "machine": platform.machine(), "processor": platform.processor(), "python_version": platform.python_version(), "boot_time": psutil.boot_time(), "users": [u._asdict() for u in psutil.users()] if hasattr(psutil, 'users') else [] } # CPU information cpu_info = self._get_cpu_info() # Memory information memory_info = self._get_memory_info() # Disk information disk_info = self._get_disk_info() # Network information net_info = self._get_network_info() # Compile final result result["system"] = { "info": system_info, "cpu": cpu_info, "memory": memory_info, "disk": disk_info, "network": net_info, "timestamp": time.time(), "execution_time_ms": round((time.time() - start_time) * 1000, 2) } # Log completion logger.info( "get_system_resources_completed", execution_time_ms=result["system"]["execution_time_ms"] ) return result except Exception as e: error_msg = f"Unexpected error getting system resources: {str(e)}" logger.error( "get_system_resources_error", error=error_msg, exc_info=True ) return { "success": False, "error": error_msg } logger.info("Registered process monitoring tools")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/windows-operations-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server