Skip to main content
Glama
main.rs17.5 kB
use anyhow::Result; use dashmap::DashMap; use rmcp::{ handler::server::{router::tool::ToolRouter, tool::Parameters}, model::*, schemars, tool, tool_handler, tool_router, transport::stdio, ServerHandler, ServiceExt, }; use serde::Deserialize; use serde_json::json; use std::{ future::Future, process::Stdio, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, sync::Mutex, time::sleep, }; use tracing::info; use uuid::Uuid; type McpError = rmcp::model::ErrorData; #[derive(Debug, Clone)] struct ExecutionBuffer { lines: Arc<Mutex<Vec<String>>>, status: Arc<Mutex<String>>, max_lines: usize, creation_time: u64, completion_time: Arc<Mutex<Option<u64>>>, error_message: Arc<Mutex<Option<String>>>, } impl ExecutionBuffer { fn new(max_lines: usize) -> Self { Self { lines: Arc::new(Mutex::new(Vec::new())), status: Arc::new(Mutex::new("running".to_string())), max_lines, creation_time: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), completion_time: Arc::new(Mutex::new(None)), error_message: Arc::new(Mutex::new(None)), } } async fn add_line(&self, line: String) { let mut lines = self.lines.lock().await; if lines.len() < self.max_lines { lines.push(line); } else if lines.len() == self.max_lines { lines.push(format!("[Output truncated at {} lines]", self.max_lines)); } } async fn mark_completed(&self) { *self.status.lock().await = "completed".to_string(); *self.completion_time.lock().await = Some( SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), ); } async fn mark_failed(&self, error: String) { *self.status.lock().await = "failed".to_string(); *self.completion_time.lock().await = Some( SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), ); *self.error_message.lock().await = Some(error); } } #[derive(Clone)] struct BpftraceServer { tool_router: ToolRouter<Self>, sudo_password: Arc<String>, execution_buffers: Arc<DashMap<String, ExecutionBuffer>>, } #[derive(Debug, Deserialize, schemars::JsonSchema)] struct ListProbesRequest { #[schemars(description = "Optional filter pattern (e.g., 'syscalls:*open*')")] filter: Option<String>, } #[derive(Debug, Deserialize, schemars::JsonSchema)] struct ExecProgramRequest { #[schemars(description = "The bpftrace program to execute")] program: String, #[schemars(description = "Execution timeout in seconds (default: 10, max: 60)")] #[serde(default = "default_timeout")] timeout: u64, } fn default_timeout() -> u64 { 10 } #[derive(Debug, Deserialize, schemars::JsonSchema)] struct GetResultRequest { #[schemars(description = "The execution ID returned by exec_program")] execution_id: String, #[schemars(description = "Start reading from this line number (default: 0)")] #[serde(default)] offset: usize, #[schemars(description = "Maximum lines to return (default: 1000)")] #[serde(default = "default_limit")] limit: usize, } fn default_limit() -> usize { 1000 } impl BpftraceServer { async fn run_bpftrace_program( _execution_id: String, program: String, timeout: Duration, sudo_password: String, buffer: ExecutionBuffer, ) { let mut cmd = Command::new("sudo"); cmd.arg("-S") .arg("bpftrace") .arg("-e") .arg(&program) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = match cmd.spawn() { Ok(child) => child, Err(e) => { buffer .mark_failed(format!("Failed to spawn process: {}", e)) .await; return; } }; // Send password to sudo if let Some(mut stdin) = child.stdin.take() { use tokio::io::AsyncWriteExt; let _ = stdin.write_all(format!("{}\n", sudo_password).as_bytes()).await; let _ = stdin.flush().await; } let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); let mut stdout_reader = BufReader::new(stdout).lines(); let mut stderr_reader = BufReader::new(stderr).lines(); let start_time = tokio::time::Instant::now(); loop { tokio::select! { _ = sleep(Duration::from_millis(100)) => { if start_time.elapsed() > timeout { let _ = child.kill().await; buffer.add_line("[Execution timed out]".to_string()).await; buffer.mark_failed("Timeout".to_string()).await; break; } } line = stdout_reader.next_line() => { match line { Ok(Some(line)) => { buffer.add_line(line).await; } Ok(None) => break, Err(e) => { buffer.mark_failed(format!("Read error: {}", e)).await; break; } } } line = stderr_reader.next_line() => { match line { Ok(Some(line)) => { if !line.starts_with("[sudo] password") { buffer.add_line(format!("[Error] {}", line)).await; } } Ok(None) => {} Err(_) => {} } } } } let _ = child.wait().await; let status = buffer.status.lock().await.clone(); if status == "running" { buffer.mark_completed().await; } } } #[tool_router] impl BpftraceServer { fn new(sudo_password: String) -> Self { let server = Self { tool_router: Self::tool_router(), sudo_password: Arc::new(sudo_password), execution_buffers: Arc::new(DashMap::new()), }; // Start cleanup task let buffers = server.execution_buffers.clone(); tokio::spawn(async move { let cleanup_interval = Duration::from_secs(300); // 5 minutes let max_age = 3600; // 1 hour loop { sleep(cleanup_interval).await; let current_time = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); let mut to_remove = Vec::new(); for entry in buffers.iter() { if current_time - entry.value().creation_time > max_age { to_remove.push(entry.key().clone()); } } for key in to_remove { buffers.remove(&key); } } }); server } #[tool(description = "List available bpftrace probes with optional filtering")] async fn list_probes( &self, Parameters(ListProbesRequest { filter }): Parameters<ListProbesRequest>, ) -> Result<CallToolResult, McpError> { let mut cmd = Command::new("sudo"); cmd.arg("-S").arg("bpftrace").arg("-l"); if let Some(filter) = filter { cmd.arg(filter); } cmd.stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = match cmd.spawn() { Ok(child) => child, Err(e) => { return Err(McpError::internal_error( "Failed to spawn bpftrace process", Some(json!({"error": e.to_string()})), )); } }; // Send password to sudo if let Some(mut stdin) = child.stdin.take() { use tokio::io::AsyncWriteExt; let _ = stdin .write_all(format!("{}\n", self.sudo_password).as_bytes()) .await; let _ = stdin.flush().await; } let output = match child.wait_with_output().await { Ok(output) => output, Err(e) => { return Err(McpError::internal_error( "Failed to execute bpftrace", Some(json!({"error": e.to_string()})), )); } }; if !output.status.success() { return Err(McpError::internal_error( "Bpftrace command failed", Some(json!({"stderr": String::from_utf8_lossy(&output.stderr).to_string()})), )); } let stdout = String::from_utf8_lossy(&output.stdout); let probes: Vec<String> = stdout .lines() .filter(|line| !line.is_empty() && !line.starts_with("[sudo]")) .map(|s| s.to_string()) .collect(); Ok(CallToolResult::success(vec![Content::text( json!({ "probes": probes, "count": probes.len() }).to_string() )])) } #[tool(description = "Get bpftrace system information and capabilities")] async fn bpf_info(&self) -> Result<CallToolResult, McpError> { let mut cmd = Command::new("sudo"); cmd.arg("-S") .arg("bpftrace") .arg("--info") .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = match cmd.spawn() { Ok(child) => child, Err(e) => { return Err(McpError::internal_error( "Failed to spawn bpftrace process", Some(json!({"error": e.to_string()})), )); } }; // Send password to sudo if let Some(mut stdin) = child.stdin.take() { use tokio::io::AsyncWriteExt; let _ = stdin .write_all(format!("{}\n", self.sudo_password).as_bytes()) .await; let _ = stdin.flush().await; } let output = match child.wait_with_output().await { Ok(output) => output, Err(e) => { return Err(McpError::internal_error( "Failed to execute bpftrace", Some(json!({"error": e.to_string()})), )); } }; if !output.status.success() { return Err(McpError::internal_error( "Bpftrace command failed", Some(json!({"stderr": String::from_utf8_lossy(&output.stderr).to_string()})), )); } let stdout = String::from_utf8_lossy(&output.stdout); Ok(CallToolResult::success(vec![Content::text(stdout.to_string())])) } #[tool(description = "Execute a bpftrace program with buffered output")] async fn exec_program( &self, Parameters(ExecProgramRequest { program, timeout }): Parameters<ExecProgramRequest>, ) -> Result<CallToolResult, McpError> { // Validate timeout let timeout = timeout.clamp(1, 60); // Generate execution ID let execution_id = format!("exec_{}", Uuid::new_v4().to_string()[..8].to_string()); // Create buffer let buffer = ExecutionBuffer::new(10000); self.execution_buffers .insert(execution_id.clone(), buffer.clone()); // Start execution in background let password = self.sudo_password.to_string(); let exec_id = execution_id.clone(); tokio::spawn(async move { BpftraceServer::run_bpftrace_program( exec_id, program, Duration::from_secs(timeout), password, buffer, ) .await; }); // Give it a moment to check for syntax errors sleep(Duration::from_millis(500)).await; // Check if it failed immediately (syntax error) if let Some(buffer) = self.execution_buffers.get(&execution_id) { let status = buffer.status.lock().await.clone(); if status == "failed" { let error_msg = buffer .error_message .lock() .await .clone() .unwrap_or_else(|| "Failed to start program".to_string()); return Err(McpError::internal_error( "Failed to start bpftrace program", Some(json!({"error": error_msg})), )); } } Ok(CallToolResult::success(vec![Content::text( json!({ "execution_id": execution_id, "status": "started", "message": format!("Program execution started with timeout of {}s", timeout) }).to_string() )])) } #[tool(description = "Get buffered output from a bpftrace execution")] async fn get_result( &self, Parameters(GetResultRequest { execution_id, offset, limit, }): Parameters<GetResultRequest>, ) -> Result<CallToolResult, McpError> { if let Some(buffer) = self.execution_buffers.get(&execution_id) { let lines = buffer.lines.lock().await; let total_lines = lines.len(); let end_index = (offset + limit).min(total_lines); let output_lines: Vec<String> = lines[offset..end_index].to_vec(); let status = buffer.status.lock().await.clone(); let error_message = buffer.error_message.lock().await.clone(); let duration = if let Some(completion_time) = *buffer.completion_time.lock().await { Some(completion_time - buffer.creation_time) } else { None }; Ok(CallToolResult::success(vec![Content::text( json!({ "execution_id": execution_id, "status": status, "lines_total": total_lines, "lines_returned": output_lines.len(), "output": output_lines, "has_more": end_index < total_lines, "error_message": error_message, "duration": duration }).to_string() )])) } else { Err(McpError::invalid_params( "Execution ID not found", None, )) } } } #[tool_handler] impl ServerHandler for BpftraceServer { fn get_info(&self) -> ServerInfo { ServerInfo { protocol_version: ProtocolVersion::V_2024_11_05, capabilities: ServerCapabilities::builder().enable_tools().build(), server_info: Implementation::from_build_env(), instructions: Some("MCP server for bpftrace - provides Linux kernel tracing capabilities".to_string()), } } } fn verify_password(password: &str) -> Result<()> { // Test the password with a simple sudo command let output = std::process::Command::new("sudo") .arg("-S") .arg("true") .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::piped()) .spawn() .and_then(|mut child| { use std::io::Write; if let Some(mut stdin) = child.stdin.take() { stdin.write_all(format!("{}\n", password).as_bytes())?; stdin.flush()?; } child.wait_with_output() })?; if !output.status.success() { // Don't use eprintln since it interferes with stdio MCP communication std::process::exit(1); } Ok(()) } #[tokio::main] async fn main() -> Result<()> { // Load .env file dotenv::dotenv().ok(); tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::from_default_env() .add_directive("bpftrace_mcp_server=info".parse()?) .add_directive("rmcp=info".parse()?), ) .init(); // Get password from environment variable let sudo_password = match std::env::var("BPFTRACE_PASSWD") { Ok(password) => { verify_password(&password)?; password }, Err(_) => { // Exit without printing to stdio/stderr to avoid interfering with MCP std::process::exit(1); } }; let server = BpftraceServer::new(sudo_password); info!("Starting bpftrace MCP server on stdio"); let service = server.serve(stdio()).await.inspect_err(|e| { tracing::error!("serving error: {:?}", e); })?; service.waiting().await?; Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eunomia-bpf/MCPtrace'

If you have feedback or need assistance with the MCP directory API, please join our Discord server