Skip to main content
Glama

microsandbox

by microsandbox
command.rs12.7 kB
//! Command execution for the microsandbox portal. //! //! This module provides functionality for executing system commands in a sandboxed environment. //! It handles: //! - Spawning and managing command processes using tokio::process::Command //! - Streaming stdout and stderr output in real-time //! - Managing command lifecycle and termination //! - Providing a secure execution environment for system commands //! //! # Architecture //! //! The architecture follows a similar pattern to the code evaluation system: //! //! 1. A command handler receives execution requests //! 2. Commands are executed in a controlled environment //! 3. Output is streamed back to the caller //! //! # Security Considerations //! //! All commands are executed with carefully controlled permissions and environment //! variables to maintain system security. Command execution is isolated to prevent //! damage to the host system. use std::{ fmt, sync::{Arc, Mutex}, }; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, sync::{ mpsc::{self, Sender}, oneshot, }, time::{sleep, Duration}, }; use uuid::Uuid; use crate::portal::repl::types::Stream; //-------------------------------------------------------------------------------------------------- // Types //-------------------------------------------------------------------------------------------------- /// Error types that can occur during command operations #[derive(Debug, thiserror::Error)] pub enum CommandError { /// Error spawning the command #[error("Failed to spawn command: {0}")] SpawnError(String), /// Error during command execution #[error("Command execution error: {0}")] ExecutionError(String), /// Timeout during execution #[error("Command timeout after {0} seconds")] Timeout(u64), /// Command environment unavailable #[error("Command environment unavailable: {0}")] Unavailable(String), } /// A single line of output from command execution #[derive(Debug, Clone)] pub struct CommandLine { /// Stream type (stdout/stderr) pub stream: Stream, /// Line content pub text: String, } /// Response from a command execution #[derive(Debug)] pub enum CommandResp { /// A line of output Line { /// Unique identifier for the execution id: String, /// Stream type (stdout/stderr) stream: Stream, /// Line content text: String, }, /// Execution completed successfully Done { /// Unique identifier for the execution id: String, /// Exit code from the command exit_code: i32, }, /// Execution resulted in an error Error { /// Unique identifier for the execution id: String, /// Error message message: String, }, } /// Command executor handle /// /// This is the primary interface that clients use to execute system commands /// in a controlled environment. #[derive(Clone)] pub struct CommandHandle { cmd_sender: Sender<CommandRequest>, } // Implement Debug for CommandHandle impl fmt::Debug for CommandHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("CommandHandle") .field("cmd_sender", &"<SENDER>") .finish() } } /// Request for command execution struct CommandRequest { id: String, command: String, args: Vec<String>, resp_tx: Sender<CommandResp>, done_tx: oneshot::Sender<Result<i32, CommandError>>, timeout: Option<u64>, } //-------------------------------------------------------------------------------------------------- // Methods //-------------------------------------------------------------------------------------------------- impl CommandHandle { /// Creates a new command handle pub fn new() -> Self { let (cmd_sender, mut cmd_receiver) = mpsc::channel::<CommandRequest>(100); // Start the command executor in a background task tokio::spawn(async move { while let Some(req) = cmd_receiver.recv().await { let CommandRequest { id, command, args, resp_tx, done_tx, timeout, } = req; // Execute the command in a separate task tokio::spawn(async move { let result = execute_command(id, command, args, resp_tx.clone(), timeout).await; let _ = done_tx.send(result); }); } }); Self { cmd_sender } } /// Executes a command and streams the output /// /// # Parameters /// /// * `command` - The command to execute /// * `args` - Arguments to pass to the command /// * `timeout` - Optional timeout in seconds after which execution will be cancelled /// /// # Returns /// /// A tuple containing the exit code and a vector of output lines pub async fn execute<S: Into<String>>( &self, command: S, args: Vec<String>, timeout: Option<u64>, ) -> Result<(i32, Vec<CommandLine>), CommandError> { let command = command.into(); // Generate a unique execution ID let execution_id = Uuid::new_v4().to_string(); // Channels for communication let (resp_tx, mut resp_rx) = mpsc::channel::<CommandResp>(100); let (line_tx, mut line_rx) = mpsc::channel::<CommandLine>(100); let (done_tx, done_rx) = oneshot::channel::<Result<i32, CommandError>>(); // Send the command execution request self.cmd_sender .send(CommandRequest { id: execution_id, command, args, resp_tx, done_tx, timeout, }) .await .map_err(|_| CommandError::Unavailable("Command executor not available".to_string()))?; // Process responses in a separate task let process_handle = tokio::spawn(async move { let mut exit_code = 0; while let Some(resp) = resp_rx.recv().await { match resp { CommandResp::Line { id: _, stream, text, } => { let _ = line_tx.send(CommandLine { stream, text }).await; } CommandResp::Done { id: _, exit_code: code, } => { exit_code = code; break; } CommandResp::Error { id: _, message } => { let _ = line_tx .send(CommandLine { stream: Stream::Stderr, text: format!("Error: {}", message), }) .await; break; } } } exit_code }); // Collect all output lines let mut lines = Vec::new(); while let Some(line) = line_rx.recv().await { lines.push(line); } // Wait for processing to complete let _exit_code = process_handle.await.unwrap_or(1); // Wait for execution completion let result = done_rx .await .map_err(|_| CommandError::ExecutionError("Command execution failed".to_string()))??; Ok((result, lines)) } } //-------------------------------------------------------------------------------------------------- // Functions //-------------------------------------------------------------------------------------------------- /// Creates a new command executor handle pub fn create_command_executor() -> CommandHandle { CommandHandle::new() } /// Executes a system command and streams the output async fn execute_command( id: String, command: String, args: Vec<String>, resp_tx: Sender<CommandResp>, timeout: Option<u64>, ) -> Result<i32, CommandError> { // Spawn the command process let mut process = Command::new(&command) .args(&args) .stdin(std::process::Stdio::null()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .map_err(|e| CommandError::SpawnError(format!("Failed to spawn command: {}", e)))?; // Get stdout and stderr handles let stdout = process .stdout .take() .ok_or_else(|| CommandError::ExecutionError("Failed to capture stdout".to_string()))?; let stderr = process .stderr .take() .ok_or_else(|| CommandError::ExecutionError("Failed to capture stderr".to_string()))?; // Track active processing let processing = Arc::new(Mutex::new(true)); // Start stdout handler let stdout_reader = BufReader::new(stdout); let stdout_resp_tx = resp_tx.clone(); let stdout_id = id.clone(); let stdout_processing = Arc::clone(&processing); let stdout_handle = tokio::spawn(async move { let mut lines = stdout_reader.lines(); while let Ok(Some(line)) = lines.next_line().await { if *stdout_processing.lock().unwrap() { let _ = stdout_resp_tx .send(CommandResp::Line { id: stdout_id.clone(), stream: Stream::Stdout, text: line, }) .await; } else { break; } } }); // Start stderr handler let stderr_reader = BufReader::new(stderr); let stderr_resp_tx = resp_tx.clone(); let stderr_id = id.clone(); let stderr_processing = Arc::clone(&processing); let stderr_handle = tokio::spawn(async move { let mut lines = stderr_reader.lines(); while let Ok(Some(line)) = lines.next_line().await { if *stderr_processing.lock().unwrap() { let _ = stderr_resp_tx .send(CommandResp::Line { id: stderr_id.clone(), stream: Stream::Stderr, text: line, }) .await; } else { break; } } }); // Set a timeout for the command execution if specified let process_wait = async { match process.wait().await { Ok(status) => { let exit_code = status.code().unwrap_or(1); let _ = resp_tx .send(CommandResp::Done { id: id.clone(), exit_code, }) .await; Ok(exit_code) } Err(e) => { let _ = resp_tx .send(CommandResp::Error { id: id.clone(), message: format!("Command execution failed: {}", e), }) .await; Err(CommandError::ExecutionError(format!( "Failed to wait for command: {}", e ))) } } }; // Execute with timeout only if specified let result = match timeout { Some(timeout_secs) => { let timeout_duration = Duration::from_secs(timeout_secs); tokio::select! { result = process_wait => result, _ = sleep(timeout_duration) => { // Kill the process on timeout let _ = process.kill().await; let _ = resp_tx .send(CommandResp::Error { id: id.clone(), message: format!("Command timed out after {} seconds", timeout_secs), }) .await; Err(CommandError::Timeout(timeout_secs)) } } } None => { // No timeout, just wait for the process to complete process_wait.await } }; // Signal to output handlers to stop { let mut guard = processing.lock().unwrap(); *guard = false; } // Wait for output handlers to complete let _ = stdout_handle.await; let _ = stderr_handle.await; result }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/microsandbox/microsandbox'

If you have feedback or need assistance with the MCP directory API, please join our Discord server