Skip to main content
Glama
processes.py10.3 kB
"""Process management tools.""" import psutil from datetime import datetime from typing import Optional from .validation import validate_pid from .ssh_executor import execute_command from .utils import format_bytes async def list_processes(host: Optional[str] = None, username: Optional[str] = None) -> str: """ List running processes. Args: host: Optional remote host to connect to username: Optional SSH username (required if host is provided) Returns: Formatted string with process list """ try: if host: # Remote execution - use ps command returncode, stdout, _ = await execute_command( ["ps", "aux", "--sort=-%cpu"], host=host, username=username ) if returncode == 0 and stdout: info = [] info.append("=== Running Processes ===\n") lines = stdout.strip().split('\n') # Take header and top 100 processes if len(lines) > 101: info.append('\n'.join(lines[:101])) info.append(f"\n\nTotal processes: {len(lines) - 1}") info.append(f"Showing: Top 100 by CPU usage") else: info.append(stdout) info.append(f"\n\nTotal processes: {len(lines) - 1}") return "\n".join(info) else: return "Error executing ps command on remote host" else: # Local execution - use psutil info = [] info.append("=== Running Processes ===\n") info.append(f"{'PID':<8} {'User':<12} {'CPU%':<8} {'Memory%':<10} {'Status':<12} {'Name':<30} {'Command'}") info.append("-" * 120) # Get all processes processes = [] for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_percent', 'status', 'cmdline']): try: processes.append(proc.info) except (psutil.NoSuchProcess, psutil.AccessDenied): pass # Sort by CPU usage (descending) processes.sort(key=lambda x: x.get('cpu_percent', 0) or 0, reverse=True) # Show top processes (limit to reasonable number) for proc_info in processes[:100]: # Show top 100 processes pid = proc_info.get('pid', 'N/A') username_val = proc_info.get('username', 'N/A') if username_val and len(username_val) > 12: username_val = username_val[:9] + '...' cpu = proc_info.get('cpu_percent', 0) or 0 mem = proc_info.get('memory_percent', 0) or 0 status = proc_info.get('status', 'N/A') name = proc_info.get('name', 'N/A') if name and len(name) > 30: name = name[:27] + '...' cmdline = proc_info.get('cmdline', []) if cmdline: cmd = ' '.join(cmdline) if len(cmd) > 40: cmd = cmd[:37] + '...' else: cmd = name info.append( f"{pid:<8} {username_val:<12} {cpu:<8.1f} {mem:<10.1f} {status:<12} {name:<30} {cmd}" ) # Add summary total_processes = len(list(psutil.process_iter())) info.append(f"\n\nTotal processes: {total_processes}") info.append(f"Showing: Top 100 by CPU usage") return "\n".join(info) except Exception as e: return f"Error listing processes: {str(e)}" async def get_process_info(pid: int, host: Optional[str] = None, username: Optional[str] = None) -> str: """ Get information about a specific process. Args: pid: Process ID host: Optional remote host to connect to username: Optional SSH username (required if host is provided) Returns: Formatted string with process information """ try: # Validate PID (accepts floats from LLMs) pid, error = validate_pid(pid) if error: return error if host: # Remote execution - use ps command returncode, stdout, _ = await execute_command( ["ps", "-p", str(pid), "-o", "pid,user,stat,pcpu,pmem,vsz,rss,etime,comm,args"], host=host, username=username ) if returncode != 0: return f"Process with PID {pid} does not exist on remote host." if stdout: info = [] info.append(f"=== Process Information for PID {pid} ===\n") info.append(stdout) # Try to get more details with /proc returncode, stdout, _ = await execute_command( ["cat", f"/proc/{pid}/status"], host=host, username=username ) if returncode == 0 and stdout: info.append("\n=== Detailed Status (/proc) ===") # Filter to show most relevant fields relevant_fields = ['Name:', 'State:', 'Tgid:', 'Pid:', 'PPid:', 'Threads:', 'VmPeak:', 'VmSize:', 'VmRSS:'] for line in stdout.split('\n'): if any(field in line for field in relevant_fields): info.append(line) return "\n".join(info) else: return f"Process with PID {pid} does not exist on remote host." else: # Local execution - use psutil # Check if process exists if not psutil.pid_exists(pid): return f"Process with PID {pid} does not exist." proc = psutil.Process(pid) info = [] info.append(f"=== Process Information for PID {pid} ===\n") # Basic info try: info.append(f"Name: {proc.name()}") except (psutil.NoSuchProcess, psutil.AccessDenied): pass try: info.append(f"Executable: {proc.exe()}") except (psutil.NoSuchProcess, psutil.AccessDenied): info.append("Executable: [Access Denied]") try: info.append(f"Command Line: {' '.join(proc.cmdline())}") except (psutil.NoSuchProcess, psutil.AccessDenied): pass try: info.append(f"Status: {proc.status()}") except (psutil.NoSuchProcess, psutil.AccessDenied): pass try: info.append(f"User: {proc.username()}") except (psutil.NoSuchProcess, psutil.AccessDenied): pass # Process IDs try: info.append(f"\nPID: {proc.pid}") info.append(f"Parent PID: {proc.ppid()}") except (psutil.NoSuchProcess, psutil.AccessDenied): pass # Resource usage try: info.append(f"\n=== Resource Usage ===") cpu_percent = proc.cpu_percent(interval=0.1) info.append(f"CPU Percent: {cpu_percent}%") mem_info = proc.memory_info() info.append(f"Memory RSS: {format_bytes(mem_info.rss)}") info.append(f"Memory VMS: {format_bytes(mem_info.vms)}") info.append(f"Memory Percent: {proc.memory_percent():.2f}%") except (psutil.NoSuchProcess, psutil.AccessDenied): info.append("Resource usage: [Access Denied]") # Timing try: create_time = datetime.fromtimestamp(proc.create_time()) info.append(f"\n=== Timing ===") info.append(f"Created: {create_time.strftime('%Y-%m-%d %H:%M:%S')}") cpu_times = proc.cpu_times() info.append(f"CPU Time (user): {cpu_times.user:.2f}s") info.append(f"CPU Time (system): {cpu_times.system:.2f}s") except (psutil.NoSuchProcess, psutil.AccessDenied): pass # Threads try: num_threads = proc.num_threads() info.append(f"\nThreads: {num_threads}") except (psutil.NoSuchProcess, psutil.AccessDenied): pass # File descriptors try: num_fds = proc.num_fds() info.append(f"Open File Descriptors: {num_fds}") except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError): pass # Not available on all systems # Connections try: connections = proc.connections() if connections: info.append(f"\n=== Network Connections ({len(connections)}) ===") for i, conn in enumerate(connections[:10]): # Show first 10 info.append(f" {conn.type.name}: {conn.laddr} -> {conn.raddr if conn.raddr else 'N/A'} [{conn.status}]") if len(connections) > 10: info.append(f" ... and {len(connections) - 10} more") except (psutil.NoSuchProcess, psutil.AccessDenied): pass return "\n".join(info) except psutil.NoSuchProcess: return f"Process with PID {pid} does not exist." except psutil.AccessDenied: return f"Access denied to process with PID {pid}. Try running with elevated privileges." except Exception as e: return f"Error getting process information: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/narmaku/linux-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server