Skip to main content
Glama

bpftrace MCP Server

by eunomia-bpf
server.py13.9 kB
#!/usr/bin/env python3 """ MCP Server for bpftrace - Minimal Implementation Provides tools for listing probes, helpers, and executing bpftrace programs """ import asyncio import subprocess import uuid import time from typing import Optional, Dict, List, Any from collections import defaultdict import shlex import os import signal from contextlib import asynccontextmanager import getpass import sys from fastmcp import FastMCP # Global storage for execution buffers execution_buffers: Dict[str, 'ExecutionBuffer'] = {} # Global storage for sudo password (session-only) SUDO_PASSWORD: Optional[str] = None # Cleanup old buffers every 5 minutes BUFFER_CLEANUP_INTERVAL = 300 BUFFER_MAX_AGE = 3600 # 1 hour async def cleanup_old_buffers(): """Periodically clean up old execution buffers""" while True: await asyncio.sleep(BUFFER_CLEANUP_INTERVAL) current_time = time.time() to_remove = [] for exec_id, buffer in execution_buffers.items(): if current_time - buffer.creation_time > BUFFER_MAX_AGE: to_remove.append(exec_id) for exec_id in to_remove: del execution_buffers[exec_id] @asynccontextmanager async def lifespan(server): """Initialize server and start background tasks""" # Startup asyncio.create_task(cleanup_old_buffers()) yield # Shutdown (if needed) # Initialize MCP server with lifespan mcp = FastMCP("bpftrace-server", lifespan=lifespan) class ExecutionBuffer: """Stores output from a bpftrace execution""" def __init__(self, execution_id: str, max_lines: int = 10000): self.execution_id = execution_id self.lines: List[str] = [] self.status = "running" # running, completed, failed self.max_lines = max_lines self.creation_time = time.time() self.completion_time: Optional[float] = None self.error_message: Optional[str] = None self.process: Optional[asyncio.subprocess.Process] = None def add_line(self, line: str): """Add a line to the buffer""" if len(self.lines) < self.max_lines: self.lines.append(line) elif len(self.lines) == self.max_lines: self.lines.append(f"[Output truncated at {self.max_lines} lines]") def mark_completed(self): """Mark execution as completed""" self.status = "completed" self.completion_time = time.time() def mark_failed(self, error: str): """Mark execution as failed""" self.status = "failed" self.completion_time = time.time() self.error_message = error async def run_bpftrace_program(execution_id: str, program: str, timeout: int): """Run a bpftrace program and capture output""" buffer = execution_buffers[execution_id] try: # Create the bpftrace command with -S flag for reading password from stdin cmd = ["sudo", "-S", "bpftrace", "-e", program] # Start the process process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE ) buffer.process = process # Send password to sudo if SUDO_PASSWORD: process.stdin.write(f"{SUDO_PASSWORD}\n".encode()) await process.stdin.drain() process.stdin.close() # Set up timeout timeout_task = asyncio.create_task(asyncio.sleep(timeout)) # Read output line by line while True: # Check if timeout occurred if timeout_task.done(): process.terminate() await asyncio.sleep(0.5) if process.returncode is None: process.kill() buffer.add_line("[Execution timed out]") buffer.mark_failed("Timeout") break # Try to read a line try: line = await asyncio.wait_for( process.stdout.readline(), timeout=0.1 ) if not line: # Process ended break decoded_line = line.decode('utf-8').rstrip() buffer.add_line(decoded_line) except asyncio.TimeoutError: # No output available, continue continue # Check if process ended if process.returncode is not None: break # Read any remaining stderr stderr = await process.stderr.read() if stderr: stderr_text = stderr.decode('utf-8').strip() if stderr_text and not stderr_text.startswith("[sudo] password"): buffer.add_line(f"[Error] {stderr_text}") buffer.mark_failed(stderr_text) return # Cancel timeout if still running if not timeout_task.done(): timeout_task.cancel() # Mark as completed if not already failed if buffer.status == "running": buffer.mark_completed() except Exception as e: buffer.mark_failed(str(e)) buffer.add_line(f"[Exception] {str(e)}") @mcp.tool() async def list_probes(filter: Optional[str] = None) -> Dict[str, Any]: """ List available bpftrace probes with optional filtering. Args: filter: Optional filter pattern (e.g., 'syscalls:*open*') Returns: Dictionary containing list of matching probes """ try: cmd = ["sudo", "-S", "bpftrace", "-l"] if filter: cmd.append(filter) process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE ) # Send password to sudo if SUDO_PASSWORD: process.stdin.write(f"{SUDO_PASSWORD}\n".encode()) await process.stdin.drain() process.stdin.close() stdout, stderr = await process.communicate() if process.returncode != 0: return { "error": stderr.decode('utf-8').strip(), "probes": [] } # Parse output probes = [ line.strip() for line in stdout.decode('utf-8').strip().split('\n') if line.strip() and not line.startswith("[sudo]") ] return { "probes": probes, "count": len(probes) } except Exception as e: return { "error": str(e), "probes": [] } @mcp.tool() async def list_helpers() -> Dict[str, Any]: """ List available bpftrace helper functions. Returns: Dictionary containing list of helper functions with descriptions """ # bpftrace doesn't have a command to list helpers, so we provide a curated list helpers = [ {"name": "printf", "description": "Print formatted output"}, {"name": "time", "description": "Current timestamp (nanoseconds since boot)"}, {"name": "str", "description": "Convert to string (for char arrays)"}, {"name": "comm", "description": "Current process name"}, {"name": "pid", "description": "Process ID"}, {"name": "tid", "description": "Thread ID"}, {"name": "uid", "description": "User ID"}, {"name": "gid", "description": "Group ID"}, {"name": "nsecs", "description": "Nanoseconds since boot"}, {"name": "kstack", "description": "Kernel stack trace"}, {"name": "ustack", "description": "User stack trace"}, {"name": "arg0...argN", "description": "Function arguments"}, {"name": "retval", "description": "Return value (in return probes)"}, {"name": "cpu", "description": "Current CPU"}, {"name": "curtask", "description": "Current task struct"}, {"name": "rand", "description": "Random number"}, {"name": "cgroup", "description": "Cgroup ID"}, {"name": "kaddr", "description": "Kernel address for symbol"}, {"name": "uaddr", "description": "User address for symbol"}, {"name": "ntop", "description": "Convert IP address to string"}, {"name": "reg", "description": "CPU register value"}, {"name": "signal", "description": "Send signal to process"}, {"name": "exit", "description": "Exit bpftrace"}, {"name": "system", "description": "Execute shell command"}, {"name": "cat", "description": "Print file contents"}, {"name": "join", "description": "Join array elements"}, {"name": "ksym", "description": "Resolve kernel address to symbol"}, {"name": "usym", "description": "Resolve user address to symbol"}, {"name": "kptr", "description": "Annotate kernel pointer"}, {"name": "uptr", "description": "Annotate user pointer"}, {"name": "sizeof", "description": "Size of type or expression"}, {"name": "print", "description": "Print non-formatted output"}, {"name": "clear", "description": "Clear a map"}, {"name": "zero", "description": "Zero a map"}, {"name": "hist", "description": "Print histogram"}, {"name": "lhist", "description": "Print linear histogram"}, {"name": "count", "description": "Count occurrences"}, {"name": "sum", "description": "Sum values"}, {"name": "min", "description": "Track minimum value"}, {"name": "max", "description": "Track maximum value"}, {"name": "avg", "description": "Calculate average"}, {"name": "stats", "description": "Calculate statistics"}, ] return { "helpers": helpers, "count": len(helpers) } @mcp.tool() async def exec_program(program: str, timeout: int = 10) -> Dict[str, Any]: """ Execute a bpftrace program with buffered output. Args: program: The bpftrace program to execute timeout: Execution timeout in seconds (default: 10, max: 60) Returns: Dictionary with execution status and ID """ # Validate timeout if timeout < 1: timeout = 1 elif timeout > 60: timeout = 60 # Generate execution ID execution_id = f"exec_{uuid.uuid4().hex[:8]}" # Create buffer buffer = ExecutionBuffer(execution_id) execution_buffers[execution_id] = buffer # Start execution in background asyncio.create_task(run_bpftrace_program(execution_id, program, timeout)) # Give it a moment to check for syntax errors await asyncio.sleep(0.5) # Check if it failed immediately (syntax error) if buffer.status == "failed": return { "status": "error", "message": buffer.error_message or "Failed to start program" } return { "status": "success", "execution_id": execution_id, "message": "Program started successfully" } @mcp.tool() async def get_result( execution_id: str, offset: int = 0, limit: int = 1000 ) -> Dict[str, Any]: """ Get buffered output from a bpftrace execution. Args: execution_id: The execution ID returned by exec_program offset: Start reading from this line number (default: 0) limit: Maximum lines to return (default: 1000) Returns: Dictionary with execution status and output """ if execution_id not in execution_buffers: return { "error": "Execution ID not found", "execution_id": execution_id } buffer = execution_buffers[execution_id] # Get the requested lines total_lines = len(buffer.lines) end_index = min(offset + limit, total_lines) output_lines = buffer.lines[offset:end_index] result = { "execution_id": execution_id, "status": buffer.status, "lines_total": total_lines, "lines_returned": len(output_lines), "output": output_lines, "has_more": end_index < total_lines } if buffer.error_message: result["error_message"] = buffer.error_message if buffer.completion_time: result["duration"] = buffer.completion_time - buffer.creation_time return result def prompt_for_password(): """Prompt user for sudo password at startup""" global SUDO_PASSWORD print("MCPtrace Server - bpftrace requires sudo access", file=sys.stderr) print("Enter your sudo password (will be cached for this session only):", file=sys.stderr) try: SUDO_PASSWORD = getpass.getpass(prompt="Password: ", stream=sys.stderr) # Test the password with a simple sudo command test_cmd = ["sudo", "-S", "true"] test_proc = subprocess.run( test_cmd, input=f"{SUDO_PASSWORD}\n".encode(), capture_output=True ) if test_proc.returncode != 0: print("Error: Invalid sudo password. Please try again.", file=sys.stderr) sys.exit(1) print("Password verified. Starting MCP server...\n", file=sys.stderr) except KeyboardInterrupt: print("\nCancelled by user.", file=sys.stderr) sys.exit(0) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": # Prompt for password before starting server prompt_for_password() # Run the server mcp.run(transport="stdio")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eunomia-bpf/MCPtrace'

If you have feedback or need assistance with the MCP directory API, please join our Discord server