Skip to main content
Glama
subproc.py15.6 kB
""" Subprocess execution tools for Pwno MCP Provides tools for running system commands, particularly useful for: - Compiling binaries with various flags (ASAN, debug symbols, etc.) - Running helper scripts - Managing background processes """ import logging import subprocess import shlex from typing import Dict, Any, Optional, List import psutil import os import tempfile import time import re logger = logging.getLogger(__name__) class SubprocessTools: """Tools for subprocess execution and management""" def __init__(self): """Initialize subprocess tools""" self.background_processes: Dict[int, Dict[str, Any]] = {} def run_command(self, command: str, cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None, timeout: Optional[float] = 30.0) -> Dict[str, Any]: """ Execute a command and wait for it to complete This is primarily intended for compilation commands like: - gcc -g -fsanitize=address program.c -o program - clang -O0 -g -fno-omit-frame-pointer vuln.c - make clean && make Args: command: Command to execute (will be shell-parsed) cwd: Working directory for the command env: Environment variables (merged with current env) timeout: Maximum execution time in seconds Returns: Dictionary with execution results """ logger.info(f"Running command: {command}") try: # Parse command for safer execution cmd_parts = shlex.split(command) # Prepare environment cmd_env = os.environ.copy() if env: cmd_env.update(env) # Execute command result = subprocess.run( cmd_parts, cwd=cwd, env=cmd_env, capture_output=True, text=True, timeout=timeout ) return { "success": result.returncode == 0, "command": command, "returncode": result.returncode, "stdout": result.stdout, "stderr": result.stderr, "cwd": cwd or os.getcwd() } except subprocess.TimeoutExpired: return { "success": False, "command": command, "error": f"Command timed out after {timeout} seconds", "cwd": cwd or os.getcwd() } except Exception as e: logger.error(f"Failed to run command: {e}") return { "success": False, "command": command, "error": str(e), "cwd": cwd or os.getcwd() } def spawn_process(self, command: str, cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None) -> Dict[str, Any]: """ Spawn a background process and return immediately Useful for starting long-running processes like: - Web servers for exploitation - Network listeners - Monitoring scripts Args: command: Command to execute cwd: Working directory for the command env: Environment variables Returns: Dictionary with process information including PID """ logger.info(f"Spawning process: {command}") # Create temp files for stdout and stderr stdout_fd, stdout_path = tempfile.mkstemp(prefix='pwno_stdout_', suffix='.log') stderr_fd, stderr_path = tempfile.mkstemp(prefix='pwno_stderr_', suffix='.log') stdout_file = os.fdopen(stdout_fd, 'w+') stderr_file = os.fdopen(stderr_fd, 'w+') try: # Parse command cmd_parts = shlex.split(command) # Prepare environment cmd_env = os.environ.copy() if env: cmd_env.update(env) # Spawn process with stdout and stderr redirected to files process = subprocess.Popen( cmd_parts, cwd=cwd, env=cmd_env, stdout=stdout_file, stderr=stderr_file, text=True ) # Store process reference and file paths self.background_processes[process.pid] = { 'process': process, 'stdout_file': stdout_file, 'stderr_file': stderr_file, 'stdout_path': stdout_path, 'stderr_path': stderr_path, 'command': command, 'cwd': cwd or os.getcwd() } # Give process a moment to potentially fail import time time.sleep(0.1) if process.poll() is not None: # Process already terminated, read outputs stdout_file.flush() stderr_file.flush() with open(stdout_path, 'r') as f: stdout = f.read() with open(stderr_path, 'r') as f: stderr = f.read() return { 'success': False, 'command': command, 'error': 'Process terminated immediately', 'returncode': process.returncode, 'stdout': stdout, 'stderr': stderr, 'stdout_path': stdout_path, 'stderr_path': stderr_path } return { 'success': True, 'command': command, 'pid': process.pid, 'cwd': cwd or os.getcwd(), 'status': 'running', 'stdout_path': stdout_path, 'stderr_path': stderr_path } except Exception as e: logger.error(f"Failed to spawn process: {e}") return { "success": False, "command": command, "error": str(e) } def get_process(self, pid: int) -> Dict[str, Any]: """ Get status of a spawned process Args: pid: Process ID to check Returns: Dictionary with process status. When available, includes live standard stream contents as strings: - ``stdout``: accumulated standard output so far - ``stderr``: accumulated standard error so far Paths to the backing log files (``stdout_path``, ``stderr_path``) are also returned for external tailing. """ try: # Check if we're tracking this process if pid in self.background_processes: entry = self.background_processes[pid] process = entry['process'] poll_result = process.poll() if poll_result is None: # Still running, return status and output paths # Flush current buffers and read accumulated outputs try: entry['stdout_file'].flush() except Exception: pass try: entry['stderr_file'].flush() except Exception: pass try: with open(entry['stdout_path'], 'r', encoding='utf-8', errors='ignore') as f: live_stdout = f.read() except Exception: live_stdout = '' try: with open(entry['stderr_path'], 'r', encoding='utf-8', errors='ignore') as f: live_stderr = f.read() except Exception: live_stderr = '' return { 'success': True, 'pid': pid, 'status': 'running', 'stdout_path': entry['stdout_path'], 'stderr_path': entry['stderr_path'], 'stdout': live_stdout, 'stderr': live_stderr, 'cpu_percent': psutil.Process(pid).cpu_percent(), 'memory_info': psutil.Process(pid).memory_info()._asdict() } else: # Process finished, read outputs entry['stdout_file'].close() entry['stderr_file'].close() with open(entry['stdout_path'], 'r') as f: stdout = f.read() with open(entry['stderr_path'], 'r') as f: stderr = f.read() del self.background_processes[pid] return { 'success': True, 'pid': pid, 'status': 'terminated', 'returncode': poll_result, 'stdout': stdout, 'stderr': stderr, 'stdout_path': entry['stdout_path'], 'stderr_path': entry['stderr_path'] } else: # Try to check if process exists anyway if psutil.pid_exists(pid): proc = psutil.Process(pid) return { "success": True, "pid": pid, "status": "running" if proc.is_running() else "unknown", "name": proc.name(), "cmdline": " ".join(proc.cmdline()) } else: return { "success": False, "pid": pid, "error": "Process not found" } except Exception as e: logger.error(f"Failed to get process status: {e}") return { "success": False, "pid": pid, "error": str(e) } def kill_process(self, pid: int, signal: int = 15) -> Dict[str, Any]: """ Kill a process Args: pid: Process ID to kill signal: Signal to send (default: SIGTERM=15, use 9 for SIGKILL) Returns: Dictionary with kill result """ try: if pid in self.background_processes: process = self.background_processes[pid]['process'] process.terminate() if signal == 15 else process.kill() process.wait(timeout=5) # Keep the process entry cached even after kill, so get_process can retrieve outputs # del self.background_processes[pid] else: os.kill(pid, signal) return { "success": True, "pid": pid, "signal": signal, "status": "killed" } except ProcessLookupError: return { "success": False, "pid": pid, "error": "Process not found" } except Exception as e: logger.error(f"Failed to kill process: {e}") return { "success": False, "pid": pid, "error": str(e) } def list_processes(self) -> Dict[str, Any]: """ List all tracked background processes Returns: Dictionary with process list """ processes = [] for pid, entry in list(self.background_processes.items()): process = entry['process'] poll_result = process.poll() if poll_result is None: # Still running try: proc_info = psutil.Process(pid) processes.append({ "pid": pid, "status": "running", "name": proc_info.name(), "cmdline": " ".join(proc_info.cmdline()), "cpu_percent": proc_info.cpu_percent(), "memory_mb": proc_info.memory_info().rss / 1024 / 1024 }) except: processes.append({ "pid": pid, "status": "running", "error": "Could not get process info" }) else: # Process terminated, clean up entry['stdout_file'].close() entry['stderr_file'].close() del self.background_processes[pid] return { "success": True, "processes": processes, "count": len(processes) } def wait_for_pid_marker(self, stdout_path: str, timeout: float = 10.0, poll_interval: float = 0.05) -> Dict[str, Any]: """ Wait for a PID marker of the form "<PID>{1234}</PID>" to appear in a stdout log file. Args: stdout_path: Path to the stdout log file produced by spawn_process timeout: Maximum time to wait in seconds poll_interval: How often to poll for new output Returns: Dictionary with keys: - success: whether a PID marker was found - pid: the captured PID if found - elapsed: time spent waiting - bytes_scanned: how many bytes of the file were scanned - error: error message if any """ start_time = time.time() pattern = re.compile(r"<PID>\{(?P<pid>\d+)\}</PID>") bytes_scanned = 0 try: if not os.path.exists(stdout_path): return {"success": False, "error": "stdout file not found", "elapsed": 0.0, "bytes_scanned": 0} with open(stdout_path, "r", encoding="utf-8", errors="ignore") as f: while True: chunk = f.read() if chunk: bytes_scanned += len(chunk) match = pattern.search(chunk) if match: elapsed = time.time() - start_time return { "success": True, "pid": int(match.group("pid")), "elapsed": elapsed, "bytes_scanned": bytes_scanned } if time.time() - start_time >= timeout: return {"success": False, "error": "timeout", "elapsed": timeout, "bytes_scanned": bytes_scanned} time.sleep(poll_interval) except Exception as e: logger.error(f"Failed while waiting for PID marker: {e}") return {"success": False, "error": str(e), "elapsed": time.time() - start_time, "bytes_scanned": bytes_scanned}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/0xSensei/pwno-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server