"""
Subprocess execution tools for Pwno MCP
Provides tools for running system commands, particularly useful for:
- Compiling binaries with various flags (ASAN, debug symbols, etc.)
- Running helper scripts
- Managing background processes
"""
import logging
import subprocess
import shlex
from typing import Dict, Any, Optional, List
import psutil
import os
import tempfile
import time
import re
logger = logging.getLogger(__name__)
class SubprocessTools:
"""Tools for subprocess execution and management"""
def __init__(self):
"""Initialize subprocess tools"""
self.background_processes: Dict[int, Dict[str, Any]] = {}
def run_command(self, command: str, cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None,
timeout: Optional[float] = 30.0) -> Dict[str, Any]:
"""
Execute a command and wait for it to complete
This is primarily intended for compilation commands like:
- gcc -g -fsanitize=address program.c -o program
- clang -O0 -g -fno-omit-frame-pointer vuln.c
- make clean && make
Args:
command: Command to execute (will be shell-parsed)
cwd: Working directory for the command
env: Environment variables (merged with current env)
timeout: Maximum execution time in seconds
Returns:
Dictionary with execution results
"""
logger.info(f"Running command: {command}")
try:
# Parse command for safer execution
cmd_parts = shlex.split(command)
# Prepare environment
cmd_env = os.environ.copy()
if env:
cmd_env.update(env)
# Execute command
result = subprocess.run(
cmd_parts,
cwd=cwd,
env=cmd_env,
capture_output=True,
text=True,
timeout=timeout
)
return {
"success": result.returncode == 0,
"command": command,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"cwd": cwd or os.getcwd()
}
except subprocess.TimeoutExpired:
return {
"success": False,
"command": command,
"error": f"Command timed out after {timeout} seconds",
"cwd": cwd or os.getcwd()
}
except Exception as e:
logger.error(f"Failed to run command: {e}")
return {
"success": False,
"command": command,
"error": str(e),
"cwd": cwd or os.getcwd()
}
def spawn_process(self, command: str, cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""
Spawn a background process and return immediately
Useful for starting long-running processes like:
- Web servers for exploitation
- Network listeners
- Monitoring scripts
Args:
command: Command to execute
cwd: Working directory for the command
env: Environment variables
Returns:
Dictionary with process information including PID
"""
logger.info(f"Spawning process: {command}")
# Create temp files for stdout and stderr
stdout_fd, stdout_path = tempfile.mkstemp(prefix='pwno_stdout_', suffix='.log')
stderr_fd, stderr_path = tempfile.mkstemp(prefix='pwno_stderr_', suffix='.log')
stdout_file = os.fdopen(stdout_fd, 'w+')
stderr_file = os.fdopen(stderr_fd, 'w+')
try:
# Parse command
cmd_parts = shlex.split(command)
# Prepare environment
cmd_env = os.environ.copy()
if env:
cmd_env.update(env)
# Spawn process with stdout and stderr redirected to files
process = subprocess.Popen(
cmd_parts,
cwd=cwd,
env=cmd_env,
stdout=stdout_file,
stderr=stderr_file,
text=True
)
# Store process reference and file paths
self.background_processes[process.pid] = {
'process': process,
'stdout_file': stdout_file,
'stderr_file': stderr_file,
'stdout_path': stdout_path,
'stderr_path': stderr_path,
'command': command,
'cwd': cwd or os.getcwd()
}
# Give process a moment to potentially fail
import time
time.sleep(0.1)
if process.poll() is not None:
# Process already terminated, read outputs
stdout_file.flush()
stderr_file.flush()
with open(stdout_path, 'r') as f:
stdout = f.read()
with open(stderr_path, 'r') as f:
stderr = f.read()
return {
'success': False,
'command': command,
'error': 'Process terminated immediately',
'returncode': process.returncode,
'stdout': stdout,
'stderr': stderr,
'stdout_path': stdout_path,
'stderr_path': stderr_path
}
return {
'success': True,
'command': command,
'pid': process.pid,
'cwd': cwd or os.getcwd(),
'status': 'running',
'stdout_path': stdout_path,
'stderr_path': stderr_path
}
except Exception as e:
logger.error(f"Failed to spawn process: {e}")
return {
"success": False,
"command": command,
"error": str(e)
}
def get_process(self, pid: int) -> Dict[str, Any]:
"""
Get status of a spawned process
Args:
pid: Process ID to check
Returns:
Dictionary with process status. When available, includes live
standard stream contents as strings:
- ``stdout``: accumulated standard output so far
- ``stderr``: accumulated standard error so far
Paths to the backing log files (``stdout_path``, ``stderr_path``)
are also returned for external tailing.
"""
try:
# Check if we're tracking this process
if pid in self.background_processes:
entry = self.background_processes[pid]
process = entry['process']
poll_result = process.poll()
if poll_result is None:
# Still running, return status and output paths
# Flush current buffers and read accumulated outputs
try:
entry['stdout_file'].flush()
except Exception:
pass
try:
entry['stderr_file'].flush()
except Exception:
pass
try:
with open(entry['stdout_path'], 'r', encoding='utf-8', errors='ignore') as f:
live_stdout = f.read()
except Exception:
live_stdout = ''
try:
with open(entry['stderr_path'], 'r', encoding='utf-8', errors='ignore') as f:
live_stderr = f.read()
except Exception:
live_stderr = ''
return {
'success': True,
'pid': pid,
'status': 'running',
'stdout_path': entry['stdout_path'],
'stderr_path': entry['stderr_path'],
'stdout': live_stdout,
'stderr': live_stderr,
'cpu_percent': psutil.Process(pid).cpu_percent(),
'memory_info': psutil.Process(pid).memory_info()._asdict()
}
else:
# Process finished, read outputs
entry['stdout_file'].close()
entry['stderr_file'].close()
with open(entry['stdout_path'], 'r') as f:
stdout = f.read()
with open(entry['stderr_path'], 'r') as f:
stderr = f.read()
del self.background_processes[pid]
return {
'success': True,
'pid': pid,
'status': 'terminated',
'returncode': poll_result,
'stdout': stdout,
'stderr': stderr,
'stdout_path': entry['stdout_path'],
'stderr_path': entry['stderr_path']
}
else:
# Try to check if process exists anyway
if psutil.pid_exists(pid):
proc = psutil.Process(pid)
return {
"success": True,
"pid": pid,
"status": "running" if proc.is_running() else "unknown",
"name": proc.name(),
"cmdline": " ".join(proc.cmdline())
}
else:
return {
"success": False,
"pid": pid,
"error": "Process not found"
}
except Exception as e:
logger.error(f"Failed to get process status: {e}")
return {
"success": False,
"pid": pid,
"error": str(e)
}
def kill_process(self, pid: int, signal: int = 15) -> Dict[str, Any]:
"""
Kill a process
Args:
pid: Process ID to kill
signal: Signal to send (default: SIGTERM=15, use 9 for SIGKILL)
Returns:
Dictionary with kill result
"""
try:
if pid in self.background_processes:
process = self.background_processes[pid]['process']
process.terminate() if signal == 15 else process.kill()
process.wait(timeout=5)
# Keep the process entry cached even after kill, so get_process can retrieve outputs
# del self.background_processes[pid]
else:
os.kill(pid, signal)
return {
"success": True,
"pid": pid,
"signal": signal,
"status": "killed"
}
except ProcessLookupError:
return {
"success": False,
"pid": pid,
"error": "Process not found"
}
except Exception as e:
logger.error(f"Failed to kill process: {e}")
return {
"success": False,
"pid": pid,
"error": str(e)
}
def list_processes(self) -> Dict[str, Any]:
"""
List all tracked background processes
Returns:
Dictionary with process list
"""
processes = []
for pid, entry in list(self.background_processes.items()):
process = entry['process']
poll_result = process.poll()
if poll_result is None:
# Still running
try:
proc_info = psutil.Process(pid)
processes.append({
"pid": pid,
"status": "running",
"name": proc_info.name(),
"cmdline": " ".join(proc_info.cmdline()),
"cpu_percent": proc_info.cpu_percent(),
"memory_mb": proc_info.memory_info().rss / 1024 / 1024
})
except:
processes.append({
"pid": pid,
"status": "running",
"error": "Could not get process info"
})
else:
# Process terminated, clean up
entry['stdout_file'].close()
entry['stderr_file'].close()
del self.background_processes[pid]
return {
"success": True,
"processes": processes,
"count": len(processes)
}
def wait_for_pid_marker(self, stdout_path: str, timeout: float = 10.0, poll_interval: float = 0.05) -> Dict[str, Any]:
"""
Wait for a PID marker of the form "<PID>{1234}</PID>" to appear in a stdout log file.
Args:
stdout_path: Path to the stdout log file produced by spawn_process
timeout: Maximum time to wait in seconds
poll_interval: How often to poll for new output
Returns:
Dictionary with keys:
- success: whether a PID marker was found
- pid: the captured PID if found
- elapsed: time spent waiting
- bytes_scanned: how many bytes of the file were scanned
- error: error message if any
"""
start_time = time.time()
pattern = re.compile(r"<PID>\{(?P<pid>\d+)\}</PID>")
bytes_scanned = 0
try:
if not os.path.exists(stdout_path):
return {"success": False, "error": "stdout file not found", "elapsed": 0.0, "bytes_scanned": 0}
with open(stdout_path, "r", encoding="utf-8", errors="ignore") as f:
while True:
chunk = f.read()
if chunk:
bytes_scanned += len(chunk)
match = pattern.search(chunk)
if match:
elapsed = time.time() - start_time
return {
"success": True,
"pid": int(match.group("pid")),
"elapsed": elapsed,
"bytes_scanned": bytes_scanned
}
if time.time() - start_time >= timeout:
return {"success": False, "error": "timeout", "elapsed": timeout, "bytes_scanned": bytes_scanned}
time.sleep(poll_interval)
except Exception as e:
logger.error(f"Failed while waiting for PID marker: {e}")
return {"success": False, "error": str(e), "elapsed": time.time() - start_time, "bytes_scanned": bytes_scanned}