Skip to main content
Glama
executor.py19.5 kB
"""Command executor with sandboxing, resource limits, and network isolation.""" import asyncio import os import resource import shutil import signal import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional from mcp.server.fastmcp.utilities.logging import get_logger from .whitelist import CommandWhitelist logger = get_logger(__name__) @dataclass class ProcessLimits: """Process resource limits configuration.""" # Memory limit in MB per process max_memory_mb: int = 512 # CPU time limit in seconds max_cpu_seconds: int = 60 # Maximum file size in MB that can be created max_file_size_mb: int = 100 # Maximum number of child processes max_processes: int = 32 # Maximum number of open files max_open_files: int = 256 @property def max_memory_bytes(self) -> int: return self.max_memory_mb * 1024 * 1024 @property def max_file_size_bytes(self) -> int: return self.max_file_size_mb * 1024 * 1024 @dataclass class CommandResult: """Result of command execution.""" success: bool exit_code: int stdout: str stderr: str execution_time: float timed_out: bool = False error: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { "success": self.success, "exit_code": self.exit_code, "stdout": self.stdout, "stderr": self.stderr, "execution_time": self.execution_time, "timed_out": self.timed_out, "error": self.error, } class CommandExecutor: """Executes commands with security restrictions.""" def __init__( self, workspace_path: Path, whitelist: Optional[CommandWhitelist] = None, limits: Optional[ProcessLimits] = None, network_isolated: bool = True, config: Optional[Dict[str, Any]] = None, ): """Initialize command executor. Args: workspace_path: The workspace directory for command execution. whitelist: Command whitelist. If None, uses default. limits: Process resource limits. If None, uses default. network_isolated: Whether to isolate network access. config: Additional configuration from config.json. """ self.workspace_path = workspace_path self.whitelist = whitelist or CommandWhitelist() self.limits = limits or ProcessLimits() self.network_isolated = network_isolated self.config = config or {} # Override limits from config if provided limits_config = self.config.get("limits", {}) if limits_config: if "max_memory_mb" in limits_config: self.limits.max_memory_mb = int(limits_config["max_memory_mb"]) if "max_cpu_seconds" in limits_config: self.limits.max_cpu_seconds = int(limits_config["max_cpu_seconds"]) if "max_file_size_mb" in limits_config: self.limits.max_file_size_mb = int(limits_config["max_file_size_mb"]) if "max_processes" in limits_config: self.limits.max_processes = int(limits_config["max_processes"]) if "timeout_seconds" in limits_config: self.default_timeout = int(limits_config["timeout_seconds"]) else: self.default_timeout = 120 # Network isolation setting from config if "network_isolated" in self.config: self.network_isolated = bool(self.config["network_isolated"]) # Check if unshare is available and has required permissions if self.network_isolated: self.network_isolated = self._check_unshare_available() def _check_unshare_available(self) -> bool: """Check if unshare is available and has required permissions. Returns: True if unshare can be used, False otherwise. """ if sys.platform != "linux": logger.warning("Network isolation is only supported on Linux, disabling it") return False # Check if unshare command exists if not shutil.which("unshare"): logger.warning("unshare command not found, disabling network isolation") return False # Try to test unshare with a simple command # This will fail if we don't have permissions try: result = subprocess.run( ["unshare", "--net", "--map-root-user", "--", "true"], capture_output=True, timeout=2, ) if result.returncode == 0: return True else: logger.warning( f"unshare test failed (exit code {result.returncode}), " "disabling network isolation. Error: " f"{result.stderr.decode('utf-8', errors='replace')[:200]}" ) return False except subprocess.TimeoutExpired: logger.warning("unshare test timed out, disabling network isolation") return False except Exception as e: logger.warning( f"unshare test failed with exception: {e}, " "disabling network isolation" ) return False def _validate_command_paths(self, command: str, cwd: Path) -> Optional[str]: """Validate that all paths in command arguments are within workspace. Args: command: The command string to validate cwd: The current working directory (must be within workspace) Returns: Error message if validation fails, None otherwise """ try: import shlex parts = shlex.split(command) if len(parts) <= 1: return None # No arguments to validate workspace_resolved = self.workspace_path.resolve() cwd_resolved = cwd.resolve() # Commands that typically take path arguments path_commands = {"find", "file", "cat", "head", "tail", "grep", "ls", "wc", "diff"} base_cmd = parts[0].split("/")[-1] if base_cmd not in path_commands: # For other commands, we still check for obvious path traversal attempts # but allow them to pass (they might not be file operations) for arg in parts[1:]: if self._is_path_traversal_attempt(arg, cwd_resolved, workspace_resolved): return f"Path traversal attempt detected in command argument: {arg}" return None # For path commands, validate all arguments that look like paths i = 1 while i < len(parts): arg = parts[i] # Skip flags (starting with -) if arg.startswith("-"): i += 1 # Some flags take values (e.g., -name "pattern") if i < len(parts) and not parts[i].startswith("-"): i += 1 continue # Check if argument looks like a path if self._is_path_traversal_attempt(arg, cwd_resolved, workspace_resolved): return f"Path traversal attempt detected: {arg} would access files outside workspace" i += 1 return None except Exception as e: logger.warning(f"Error validating command paths: {e}") # Don't block on validation errors, but log them return None def _is_path_traversal_attempt(self, arg: str, cwd: Path, workspace: Path) -> bool: """Check if an argument is a path traversal attempt. Args: arg: Command argument to check cwd: Current working directory workspace: Workspace root directory Returns: True if this looks like a path traversal attempt """ # Skip non-path arguments if not arg or arg.startswith("-"): return False # Check for obvious path traversal patterns if ".." in arg: try: # Resolve the path relative to cwd test_path = (cwd / arg).resolve() # Check if it's still within workspace if not str(test_path).startswith(str(workspace)): return True except (ValueError, OSError): # If we can't resolve it, it might be malicious return True # Check for absolute paths outside workspace if arg.startswith("/"): try: test_path = Path(arg).resolve() if not str(test_path).startswith(str(workspace)): return True except (ValueError, OSError): return True return False def _set_process_limits(self): """Set resource limits for the child process. This is called as preexec_fn in subprocess. """ try: # Memory limit (virtual memory / address space) resource.setrlimit( resource.RLIMIT_AS, (self.limits.max_memory_bytes, self.limits.max_memory_bytes), ) # CPU time limit resource.setrlimit( resource.RLIMIT_CPU, (self.limits.max_cpu_seconds, self.limits.max_cpu_seconds), ) # File size limit resource.setrlimit( resource.RLIMIT_FSIZE, (self.limits.max_file_size_bytes, self.limits.max_file_size_bytes), ) # Number of processes limit resource.setrlimit( resource.RLIMIT_NPROC, (self.limits.max_processes, self.limits.max_processes), ) # Open files limit resource.setrlimit( resource.RLIMIT_NOFILE, (self.limits.max_open_files, self.limits.max_open_files), ) except (ValueError, resource.error) as e: # Log but don't fail - some limits might not be settable logger.warning(f"Failed to set some resource limits: {e}") def _get_sandboxed_env(self) -> Dict[str, str]: """Get environment variables for sandboxed execution. Returns: Environment dictionary with restricted settings. """ env = os.environ.copy() # Set working directory related vars env["HOME"] = str(self.workspace_path) env["PWD"] = str(self.workspace_path) # Python settings env["PYTHONUNBUFFERED"] = "1" env["PYTHONDONTWRITEBYTECODE"] = "1" env["PYTHONHASHSEED"] = "random" # Disable pip/npm from installing (even if command gets through) env["PIP_NO_INPUT"] = "1" env["npm_config_ignore_scripts"] = "true" # Matplotlib backend for headless operation env["MPLBACKEND"] = "Agg" # Limit Node.js memory env["NODE_OPTIONS"] = f"--max-old-space-size={self.limits.max_memory_mb}" return env def _wrap_command_for_isolation(self, command: str) -> str: """Wrap command for network isolation if enabled. Args: command: Original command. Returns: Wrapped command with network isolation. """ if not self.network_isolated: return command # Check if unshare is available (Linux only) if sys.platform != "linux": logger.warning("Network isolation is only supported on Linux") return command # Use unshare to create a new network namespace # --net: new network namespace (no network access) # --map-root-user: map root user for unprivileged unshare # Note: This requires user namespaces to be enabled return f"unshare --net --map-root-user -- {command}" async def execute( self, command: str, working_dir: str = "/", timeout: Optional[int] = None, env: Optional[Dict[str, str]] = None, ) -> CommandResult: """Execute a command in the sandbox. Args: command: Command to execute. working_dir: Working directory relative to workspace root. timeout: Timeout in seconds. If None, uses default. env: Additional environment variables. Returns: CommandResult with execution details. """ import time start_time = time.time() # Validate command against whitelist allowed, error_msg = self.whitelist.is_command_allowed(command) if not allowed: return CommandResult( success=False, exit_code=-1, stdout="", stderr="", execution_time=0, error=error_msg, ) # Resolve working directory # Handle None case (use workspace root) if working_dir is None or working_dir == "": working_dir = "/" if working_dir.startswith("/"): working_dir = working_dir[1:] if working_dir and working_dir != ".": cwd = self.workspace_path / working_dir else: cwd = self.workspace_path # Ensure working directory exists and is within workspace if not cwd.exists(): return CommandResult( success=False, exit_code=-1, stdout="", stderr="", execution_time=0, error=f"Working directory does not exist: /{working_dir}", ) # Verify working directory is within workspace (prevent path traversal) try: cwd_resolved = cwd.resolve() workspace_resolved = self.workspace_path.resolve() if not str(cwd_resolved).startswith(str(workspace_resolved)): logger.warning( f"Path traversal attempt detected: {cwd_resolved} outside {workspace_resolved}" ) return CommandResult( success=False, exit_code=-1, stdout="", stderr="", execution_time=0, error="Working directory outside workspace is not allowed", ) except Exception as e: logger.error(f"Error validating working directory: {e}") return CommandResult( success=False, exit_code=-1, stdout="", stderr="", execution_time=0, error="Failed to validate working directory", ) # Validate paths in command arguments to prevent path traversal path_validation_error = self._validate_command_paths(command, cwd) if path_validation_error: return CommandResult( success=False, exit_code=-1, stdout="", stderr="", execution_time=0, error=path_validation_error, ) # Prepare environment exec_env = self._get_sandboxed_env() if env: exec_env.update(env) # Apply timeout effective_timeout = timeout if timeout is not None else self.default_timeout effective_timeout = min(effective_timeout, 300) # Max 5 minutes # Wrap command for network isolation wrapped_command = self._wrap_command_for_isolation(command) try: # Create subprocess with resource limits proc = await asyncio.create_subprocess_shell( wrapped_command, cwd=str(cwd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=exec_env, preexec_fn=self._set_process_limits if sys.platform != "win32" else None, ) try: stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=effective_timeout, ) except asyncio.TimeoutError: # Kill the process and all children try: proc.kill() await proc.wait() except ProcessLookupError: pass return CommandResult( success=False, exit_code=-1, stdout="", stderr="", execution_time=time.time() - start_time, timed_out=True, error=f"Command timed out after {effective_timeout} seconds", ) execution_time = time.time() - start_time return CommandResult( success=proc.returncode == 0, exit_code=proc.returncode or 0, stdout=stdout.decode("utf-8", errors="replace"), stderr=stderr.decode("utf-8", errors="replace"), execution_time=execution_time, ) except Exception as e: logger.error(f"Command execution failed: {e}") return CommandResult( success=False, exit_code=-1, stdout="", stderr="", execution_time=time.time() - start_time, error=f"Execution failed: {type(e).__name__}", ) def format_result(self, result: CommandResult) -> str: """Format command result for display. Args: result: Command execution result. Returns: Formatted string for display. """ lines = [] if result.error: lines.append(f"❌ Error: {result.error}") return "\n".join(lines) if result.timed_out: lines.append(f"⏰ Command timed out after {result.execution_time:.1f}s") return "\n".join(lines) if result.success: lines.append(f"✅ Command completed successfully (exit code: {result.exit_code})") else: lines.append(f"❌ Command failed (exit code: {result.exit_code})") lines.append(f"⏱️ Execution time: {result.execution_time:.3f}s") lines.append("") if result.stdout.strip(): lines.append("**stdout:**") lines.append("```") # Truncate if too long stdout = result.stdout.strip() if len(stdout) > 10000: stdout = stdout[:10000] + "\n... (truncated)" lines.append(stdout) lines.append("```") if result.stderr.strip(): lines.append("") lines.append("**stderr:**") lines.append("```") stderr = result.stderr.strip() if len(stderr) > 5000: stderr = stderr[:5000] + "\n... (truncated)" lines.append(stderr) lines.append("```") return "\n".join(lines)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/answerlink/MCP-Workspace-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server