process_tools.py.backupā¢27.6 kB
"""
Process monitoring and management tools for Windows Operations MCP.
This module provides tools for process monitoring and management with
structured logging, input validation, and security checks.
"""
import platform
import time
from typing import Dict, Any, List, Optional, Tuple
# Check if psutil is available
try:
import psutil
from psutil import NoSuchProcess, AccessDenied, ZombieProcess
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
from ..logging_config import get_logger
from ..decorators import tool_decorator, rate_limited, validate_inputs
# Initialize structured logger
logger = get_logger(__name__)
# Constants
MAX_PROCESSES = 1000 # Maximum number of processes to return in list
DEFAULT_MAX_PROCESSES = 100
SYSTEM_PROCESSES = {
'System', 'Registry', 'csrss.exe', 'winlogon.exe',
'services.exe', 'lsass.exe', 'smss.exe', 'wininit.exe',
'svchost.exe', 'dwm.exe', 'taskhostw.exe'
}
def validate_process_inputs(
filter_name: Optional[str] = None,
include_system: bool = False,
max_processes: int = DEFAULT_MAX_PROCESSES,
pid: Optional[int] = None
) -> Tuple[bool, Optional[str]]:
"""
Validate process tool input parameters.
Args:
filter_name: Process name filter string
include_system: Whether to include system processes
max_processes: Maximum number of processes to return
pid: Process ID to validate (if applicable)
Returns:
Tuple of (is_valid, error_message)
"""
# Validate filter_name if provided
if filter_name is not None and not isinstance(filter_name, str):
return False, "Filter name must be a string or None"
# Validate max_processes
if not isinstance(max_processes, int) or not (1 <= max_processes <= MAX_PROCESSES):
return False, f"Max processes must be an integer between 1 and {MAX_PROCESSES}"
# Validate pid if provided
if pid is not None:
if not isinstance(pid, int) or pid <= 0:
return False, "Process ID must be a positive integer"
return True, None
def get_process_basic_info(proc: 'psutil.Process') -> Dict[str, Any]:
"""
Get basic information about a process.
Args:
proc: psutil.Process object
Returns:
Dict containing basic process information
"""
try:
with proc.oneshot():
return {
"pid": proc.pid,
"name": proc.name(),
"username": proc.username() or "N/A",
"status": proc.status(),
"memory_mb": round(proc.memory_info().rss / 1024 / 1024, 2),
"cpu_percent": proc.cpu_percent(interval=0.1) or 0.0,
"create_time": proc.create_time(),
"exe": proc.exe() if proc.exe() else "N/A",
"cmdline": proc.cmdline(),
"parent_pid": proc.ppid()
}
except (NoSuchProcess, AccessDenied, ZombieProcess) as e:
logger.warning("Failed to get basic process info", pid=proc.pid, error=str(e))
return {"error": f"Failed to get process info: {e}"}
def _get_cpu_info() -> Dict[str, Any]:
"""
Get detailed CPU information.
Returns:
Dict containing CPU information
"""
try:
cpu_percent = psutil.cpu_percent(interval=0.5, percpu=True)
cpu_count = psutil.cpu_count(logical=False)
cpu_count_logical = psutil.cpu_count(logical=True)
cpu_times = psutil.cpu_times_percent(interval=0.5)
cpu_info = {
"physical_cores": cpu_count,
"logical_cores": cpu_count_logical,
"percent": cpu_percent,
"avg_percent": sum(cpu_percent) / len(cpu_percent) if cpu_percent else 0,
"times": {
"user": cpu_times.user,
"system": cpu_times.system,
"idle": cpu_times.idle,
"iowait": getattr(cpu_times, 'iowait', None),
"irq": getattr(cpu_times, 'irq', None),
"steal": getattr(cpu_times, 'steal', None)
}
}
# Add CPU frequency if available
try:
cpu_freq = psutil.cpu_freq()
if cpu_freq:
cpu_info["freq"] = {
"current_mhz": cpu_freq.current,
"min_mhz": cpu_freq.min,
"max_mhz": cpu_freq.max
}
except Exception as e:
logger.warning("Failed to get CPU frequency info", error=str(e))
cpu_info["freq"] = None
# Add CPU stats if available
try:
cpu_stats = psutil.cpu_stats()
cpu_info["stats"] = {
"ctx_switches": getattr(cpu_stats, 'ctx_switches', None),
"interrupts": getattr(cpu_stats, 'interrupts', None),
"soft_interrupts": getattr(cpu_stats, 'soft_interrupts', None),
"syscalls": getattr(cpu_stats, 'syscalls', None)
}
except Exception as e:
logger.warning("Failed to get CPU stats", error=str(e))
cpu_info["stats"] = None
return cpu_info
except Exception as e:
logger.error("Failed to get CPU information", error=str(e))
return {"error": f"Failed to get CPU information: {str(e)}"}
def _get_memory_info() -> Dict[str, Any]:
"""
Get detailed memory and swap information.
Returns:
Dict containing memory and swap information
"""
try:
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
return {
"virtual": {
"total_gb": round(memory.total / (1024 ** 3), 2),
"available_gb": round(memory.available / (1024 ** 3), 2),
"used_gb": round(memory.used / (1024 ** 3), 2),
"free_gb": round(memory.free / (1024 ** 3), 2),
"percent": memory.percent,
"active_gb": round(getattr(memory, 'active', 0) / (1024 ** 3), 2) if hasattr(memory, 'active') else None,
"inactive_gb": round(getattr(memory, 'inactive', 0) / (1024 ** 3), 2) if hasattr(memory, 'inactive') else None
},
"swap": {
"total_gb": round(swap.total / (1024 ** 3), 2),
"used_gb": round(swap.used / (1024 ** 3), 2),
"free_gb": round(swap.free / (1024 ** 3), 2),
"percent": swap.percent,
"sin_gb": round(swap.sin / (1024 ** 3), 2) if hasattr(swap, 'sin') else None,
"sout_gb": round(swap.sout / (1024 ** 3), 2) if hasattr(swap, 'sout') else None
}
}
except Exception as e:
logger.error("Failed to get memory information", error=str(e))
return {"error": f"Failed to get memory information: {str(e)}"}
def _get_disk_info() -> Dict[str, Any]:
"""
Get detailed disk and partition information.
Returns:
Dict containing disk and partition information
"""
disk_info = {"partitions": [], "io": {}}
try:
# Get disk partitions
for part in psutil.disk_partitions(all=False): # all=False skips pseudo-filesystems
try:
usage = psutil.disk_usage(part.mountpoint)
disk_info["partitions"].append({
"device": part.device,
"mountpoint": part.mountpoint,
"fstype": part.fstype,
"opts": part.opts,
"total_gb": round(usage.total / (1024 ** 3), 2),
"used_gb": round(usage.used / (1024 ** 3), 2),
"free_gb": round(usage.free / (1024 ** 3), 2),
"percent_used": usage.percent
})
except Exception as e:
logger.warning("Failed to get disk usage", mountpoint=part.mountpoint, error=str(e))
# Get disk I/O counters
try:
disk_io = psutil.disk_io_counters(perdisk=False)
if disk_io:
disk_info["io"] = {
"read_count": disk_io.read_count,
"write_count": disk_io.write_count,
"read_mb": round(disk_io.read_bytes / (1024 ** 2), 2),
"write_mb": round(disk_io.write_bytes / (1024 ** 2), 2),
"read_time_ms": disk_io.read_time,
"write_time_ms": disk_io.write_time,
"busy_time_ms": getattr(disk_io, 'busy_time', None)
}
except Exception as e:
logger.warning("Failed to get disk I/O counters", error=str(e))
except Exception as e:
logger.error("Failed to get disk information", error=str(e))
return {"error": f"Failed to get disk information: {str(e)}"}
return disk_info
def _get_network_info() -> Dict[str, Any]:
"""
Get detailed network interface and I/O information.
Returns:
Dict containing network information
"""
net_info = {"io": {}, "interfaces": {}}
try:
# Get network I/O counters
try:
net_io = psutil.net_io_counters(pernic=False)
if net_io:
net_info["io"] = {
"bytes_sent_mb": round(net_io.bytes_sent / (1024 ** 2), 2),
"bytes_recv_mb": round(net_io.bytes_recv / (1024 ** 2), 2),
"packets_sent": net_io.packets_sent,
"packets_recv": net_io.packets_recv,
"errin": net_io.errin,
"errout": net_io.errout,
"dropin": net_io.dropin,
"dropout": net_io.dropout
}
except Exception as e:
logger.warning("Failed to get network I/O counters", error=str(e))
# Get network interface details
try:
net_addrs = psutil.net_if_addrs()
net_stats = psutil.net_if_stats()
for name, addrs in net_addrs.items():
try:
stats = net_stats.get(name, {})
net_info["interfaces"][name] = {
"addresses": [
{
"family": str(addr.family),
"address": addr.address,
"netmask": addr.netmask,
"broadcast": addr.broadcast,
"ptp": addr.ptp
}
for addr in addrs
if addr.address
],
"stats": stats._asdict() if hasattr(stats, '_asdict') else str(stats)
}
except Exception as e:
logger.warning("Failed to get network interface info", interface=name, error=str(e))
net_info["interfaces"][name] = {"error": str(e)}
except Exception as e:
logger.warning("Failed to get network interface details", error=str(e))
except Exception as e:
logger.error("Failed to get network information", error=str(e))
return {"error": f"Failed to get network information: {str(e)}"}
return net_info
def register_process_tools(mcp):
"""
Register process monitoring tools with FastMCP.
Args:
mcp: The FastMCP instance to register tools with
"""
@mcp.tool()
@tool_decorator
@rate_limited(max_calls=15, time_window=60) # 15 calls per minute
@validate_inputs(
lambda filter_name, **_: validate_process_inputs(filter_name=filter_name)[0],
lambda include_system, **_: validate_process_inputs(include_system=include_system)[0],
lambda max_processes, **_: validate_process_inputs(max_processes=max_processes)[0]
)
def get_process_list(
filter_name: Optional[str] = None,
include_system: bool = False,
max_processes: int = DEFAULT_MAX_PROCESSES
) -> Dict[str, Any]:
"""
Get list of running processes with filtering options.
Args:
filter_name: Filter processes by name (case-insensitive partial match)
include_system: Include system processes (default: False)
max_processes: Maximum number of processes to return (1-1000)
Returns:
Dict containing:
- success (bool): Whether the operation succeeded
- processes (list): List of process information dictionaries
- total_count (int): Total number of processes returned
- system_info (dict): System information snapshot
- error (str, optional): Error message if success is False
"""
# Log the operation
logger.info(
"get_process_list_started",
filter_name=filter_name,
include_system=include_system,
max_processes=max_processes
)
# Validate inputs
is_valid, error_msg = validate_process_inputs(
filter_name=filter_name,
include_system=include_system,
max_processes=max_processes
)
if not is_valid:
logger.error("get_process_list_validation_failed", error=error_msg)
return {
"success": False,
"error": f"Invalid input: {error_msg}",
"processes": [],
"total_count": 0
}
if not HAS_PSUTIL:
error_msg = "psutil not available. Install with: pip install psutil"
logger.error("get_process_list_dependency_missing", error=error_msg)
return {
"success": False,
"error": error_msg,
"note": "Process monitoring requires psutil package",
"processes": [],
"total_count": 0
}
processes = []
process_count = 0
start_time = time.time()
try:
# Get system info snapshot
sys_info = {
"platform": platform.platform(),
"cpu_count": psutil.cpu_count(),
"cpu_percent": psutil.cpu_percent(interval=0.1),
"memory_percent": psutil.virtual_memory().percent,
"timestamp": time.time()
}
# Iterate through processes
for proc in psutil.process_iter(['pid', 'name']):
try:
# Skip if we've reached the maximum number of processes
if len(processes) >= max_processes:
break
# Get process info
pinfo = get_process_basic_info(proc)
# Skip if we couldn't get process info
if "error" in pinfo:
continue
# Apply filters
if filter_name and filter_name.lower() not in pinfo['name'].lower():
continue
if not include_system and pinfo['name'] in SYSTEM_PROCESSES:
continue
# Add to results
processes.append(pinfo)
process_count += 1
except (NoSuchProcess, AccessDenied, ZombieProcess):
# Process might have died between listing and info gathering
continue
# Sort by memory usage (descending)
processes.sort(key=lambda x: x.get('memory_mb', 0), reverse=True)
# Calculate execution time
execution_time = (time.time() - start_time) * 1000 # ms
# Prepare result
result = {
"success": True,
"processes": processes,
"total_count": len(processes),
"system_info": sys_info,
"execution_time_ms": round(execution_time, 2),
"filter_applied": filter_name,
"include_system": include_system
}
# Log completion
logger.info(
"get_process_list_completed",
process_count=len(processes),
execution_time_ms=round(execution_time, 2)
)
return result
except Exception as e:
error_msg = f"Unexpected error getting process list: {str(e)}"
logger.error(
"get_process_list_error",
error=error_msg,
exc_info=True
)
return {
"success": False,
"error": error_msg,
"processes": [],
"total_count": 0
}
@mcp.tool()
@tool_decorator
@rate_limited(max_calls=20, time_window=60) # 20 calls per minute
@validate_inputs(
lambda pid, **_: validate_process_inputs(pid=pid)[0]
)
def get_process_info(pid: int) -> Dict[str, Any]:
"""
Get detailed information about a specific process.
Args:
pid: Process ID to query (must be a positive integer)
Returns:
Dict containing:
- success (bool): Whether the operation succeeded
- process (dict): Detailed process information if successful
- error (str, optional): Error message if success is False
"""
# Log the operation
logger.info("get_process_info_started", pid=pid)
# Validate inputs
is_valid, error_msg = validate_process_inputs(pid=pid)
if not is_valid:
logger.error("get_process_info_validation_failed", pid=pid, error=error_msg)
return {
"success": False,
"error": f"Invalid process ID: {error_msg}"
}
if not HAS_PSUTIL:
error_msg = "psutil not available. Install with: pip install psutil"
logger.error("get_process_info_dependency_missing", error=error_msg)
return {
"success": False,
"error": error_msg,
"note": "Process monitoring requires psutil package"
}
start_time = time.time()
try:
proc = psutil.Process(pid)
# Gather comprehensive process info
with proc.oneshot():
# Get basic process info
process_info = get_process_basic_info(proc)
if "error" in process_info:
logger.warning(
"get_process_info_basic_failed",
pid=pid,
error=process_info["error"]
)
return {"success": False, "error": process_info["error"]}
# Add additional detailed information
try:
# Get memory info
mem_info = proc.memory_full_info()
process_info.update({
"memory": {
"rss_mb": round(mem_info.rss / 1024 / 1024, 2),
"vms_mb": round(mem_info.vms / 1024 / 1024, 2),
"percent": round(proc.memory_percent(), 2),
"uss_mb": round(mem_info.uss / 1024 / 1024, 2) if hasattr(mem_info, 'uss') else None,
"pss_mb": round(mem_info.pss / 1024 / 1024, 2) if hasattr(mem_info, 'pss') else None,
"swap_mb": round(mem_info.swap / 1024 / 1024, 2) if hasattr(mem_info, 'swap') else None
},
"cpu": {
"percent": proc.cpu_percent(interval=0.1),
"num_ctx_switches": proc.num_ctx_switches()._asdict() if proc.num_ctx_switches() else {},
"cpu_num": proc.cpu_num(),
"cpu_affinity": proc.cpu_affinity()
},
"io_counters": proc.io_counters()._asdict() if proc.io_counters() else {},
"num_handles": proc.num_handles() if hasattr(proc, 'num_handles') else None,
"num_threads": proc.num_threads(),
"threads": [
{"id": t.id, "user_time": t.user_time, "system_time": t.system_time}
for t in proc.threads()
] if proc.threads() else []
})
# Get environment variables if available
try:
process_info["environ"] = proc.environ()
except (AccessDenied, AttributeError):
process_info["environ"] = "Access denied or not available"
# Get open files if available
try:
open_files = proc.open_files()
process_info["open_files"] = [
{"path": f.path, "fd": f.fd} for f in open_files[:50] # Limit to first 50 files
] if open_files else []
except (AccessDenied, AttributeError):
process_info["open_files"] = "Access denied or not available"
# Get network connections if available
try:
connections = proc.connections()
process_info["connections"] = [
{
"fd": conn.fd,
"family": str(conn.family),
"type": str(conn.type),
"local_address": f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "N/A",
"remote_address": f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else "N/A",
"status": conn.status
}
for conn in connections[:20] # Limit to first 20 connections
]
except (AccessDenied, AttributeError):
process_info["connections"] = "Access denied or not available"
except (NoSuchProcess, AccessDenied, ZombieProcess) as e:
logger.warning("Failed to get detailed process info", pid=pid, error=str(e))
# Continue with basic info if detailed info fails
# Add execution time
process_info["execution_time_ms"] = round((time.time() - start_time) * 1000, 2)
# Log completion
logger.info(
"get_process_info_completed",
pid=pid,
execution_time_ms=process_info["execution_time_ms"]
)
return {"success": True, "process": process_info}
except NoSuchProcess:
error_msg = f"Process with PID {pid} not found"
logger.error("get_process_info_not_found", pid=pid, error=error_msg)
return {
"success": False,
"error": error_msg
}
except AccessDenied:
error_msg = f"Access denied for process PID {pid} (elevated privileges may be required)"
logger.error("get_process_info_access_denied", pid=pid, error=error_msg)
return {
"success": False,
"error": error_msg
}
except Exception as e:
error_msg = f"Unexpected error getting process info: {str(e)}"
logger.error(
"get_process_info_error",
pid=pid,
error=error_msg,
exc_info=True
)
return {
"success": False,
"error": error_msg
}
@mcp.tool()
@tool_decorator
@rate_limited(max_calls=10, time_window=60) # 10 calls per minute
def get_system_resources() -> Dict[str, Any]:
"""
Get comprehensive system resource usage information.
Returns:
Dict containing:
- success (bool): Whether the operation succeeded
- system (dict): System resource information
- error (str, optional): Error message if success is False
"""
# Log the operation
logger.info("get_system_resources_started")
start_time = time.time()
if not HAS_PSUTIL:
error_msg = "psutil not available. Install with: pip install psutil"
logger.error("get_system_resources_dependency_missing", error=error_msg)
return {
"success": False,
"error": error_msg,
"note": "System monitoring requires psutil package"
}
result = {"success": True, "system": {}}
try:
# System information
system_info = {
"platform": platform.platform(),
"system": platform.system(),
"node": platform.node(),
"release": platform.release(),
"version": platform.version(),
"machine": platform.machine(),
"processor": platform.processor(),
"python_version": platform.python_version(),
"boot_time": psutil.boot_time(),
"users": [u._asdict() for u in psutil.users()] if hasattr(psutil, 'users') else []
}
# CPU information
cpu_info = self._get_cpu_info()
# Memory information
memory_info = self._get_memory_info()
# Disk information
disk_info = self._get_disk_info()
# Network information
net_info = self._get_network_info()
# Compile final result
result["system"] = {
"info": system_info,
"cpu": cpu_info,
"memory": memory_info,
"disk": disk_info,
"network": net_info,
"timestamp": time.time(),
"execution_time_ms": round((time.time() - start_time) * 1000, 2)
}
# Log completion
logger.info(
"get_system_resources_completed",
execution_time_ms=result["system"]["execution_time_ms"]
)
return result
except Exception as e:
error_msg = f"Unexpected error getting system resources: {str(e)}"
logger.error(
"get_system_resources_error",
error=error_msg,
exc_info=True
)
return {
"success": False,
"error": error_msg
}
logger.info("Registered process monitoring tools")