Skip to main content
Glama

AWS Model Context Protocol Server

by alexei-led
sandbox.py27.4 kB
"""Sandbox execution for AWS CLI commands. This module provides OS-level process isolation for running AWS CLI commands securely. It supports: - Linux: Landlock LSM (kernel 5.13+) with bubblewrap fallback - macOS: Seatbelt (sandbox-exec) via ctypes The sandbox restricts filesystem access while allowing: - Read access to system binaries and libraries - Read access to AWS configuration (~/.aws) if configured - Write access to /tmp and current working directory - Network access for AWS API calls """ import ctypes import logging import os import platform import shutil import subprocess from abc import ABC, abstractmethod from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: from collections.abc import Sequence logger = logging.getLogger(__name__) # AWS environment variables passed through to sandboxed processes AWS_ENV_VARS = [ "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN", "AWS_REGION", "AWS_DEFAULT_REGION", "AWS_PROFILE", "AWS_CONFIG_FILE", "AWS_SHARED_CREDENTIALS_FILE", "AWS_CA_BUNDLE", "AWS_ROLE_ARN", "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_SESSION_NAME", "AWS_STS_REGIONAL_ENDPOINTS", "AWS_EC2_METADATA_DISABLED", "AWS_METADATA_SERVICE_TIMEOUT", "AWS_METADATA_SERVICE_NUM_ATTEMPTS", ] # Secret-bearing variables excluded when pass_aws_env=False AWS_SECRET_ENV_VARS = { "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN", "AWS_ROLE_ARN", "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_SESSION_NAME", } def get_aws_credential_paths() -> list[str]: """Get all AWS credential-related paths that should be accessible. Returns specific file paths (not directories) for minimal access scope. The default ~/.aws directory is still whitelisted as a directory for backward compatibility, but custom paths are file-only. """ paths: list[str] = [] # Default ~/.aws directory (backward compat) default_aws_dir = Path.home() / ".aws" if default_aws_dir.exists(): paths.append(str(default_aws_dir)) # Custom credentials file creds_file = os.environ.get("AWS_SHARED_CREDENTIALS_FILE") if creds_file: creds_path = Path(creds_file) if creds_path.exists(): paths.append(str(creds_path)) # Custom config file config_file = os.environ.get("AWS_CONFIG_FILE") if config_file: config_path = Path(config_file) if config_path.exists(): paths.append(str(config_path)) # Web identity token file (EKS IRSA) token_file = os.environ.get("AWS_WEB_IDENTITY_TOKEN_FILE") if token_file: token_path = Path(token_file) if token_path.exists(): paths.append(str(token_path)) seen: set[str] = set() unique_paths: list[str] = [] for path in paths: if path not in seen: seen.add(path) unique_paths.append(path) return unique_paths @dataclass class SandboxConfig: """Configuration for sandbox execution.""" # Paths with read-only access read_paths: list[str] = field(default_factory=list) # Paths with read-write access write_paths: list[str] = field(default_factory=list) # Allow network access allow_network: bool = True # Pass AWS credentials via environment variables pass_aws_env: bool = True # Allow read access to ~/.aws directory allow_aws_config: bool = True # Environment variables to pass through env_passthrough: list[str] = field(default_factory=list) def __post_init__(self): """Set up default paths.""" if not self.read_paths: self.read_paths = self._default_read_paths() if not self.write_paths: self.write_paths = self._default_write_paths() if not self.env_passthrough: self.env_passthrough = self._default_env_passthrough() # Remove secret AWS variables if env-based credentials are disabled if not self.pass_aws_env: self.env_passthrough = [var for var in self.env_passthrough if var not in AWS_SECRET_ENV_VARS] @staticmethod def _default_read_paths() -> list[str]: """Return default read-only paths for the platform.""" system = platform.system() if system == "Linux": return [ "/usr", "/bin", "/lib", "/lib64", "/etc/ssl", "/etc/ca-certificates", "/etc/pki", "/etc/resolv.conf", "/etc/hosts", "/etc/nsswitch.conf", "/etc/passwd", "/etc/group", ] elif system == "Darwin": return [ "/usr", "/bin", "/sbin", "/Library", "/System", "/private/etc", "/var/run", "/dev", "/opt/homebrew", ] return [] @staticmethod def _default_write_paths() -> list[str]: """Return default writable paths.""" return [ "/tmp", os.environ.get("TMPDIR", "/tmp"), os.getcwd(), ] @staticmethod def _default_env_passthrough() -> list[str]: """Return default environment variables to pass through.""" return [ "PATH", "HOME", "USER", "LANG", "LC_ALL", "TERM", "TZ", # AWS-specific *AWS_ENV_VARS, ] class SandboxError(Exception): """Exception raised when sandbox operations fail.""" pass class SandboxBackend(ABC): """Abstract base class for sandbox backends.""" @abstractmethod def is_available(self) -> bool: """Check if this sandbox backend is available on the current system.""" @abstractmethod def execute( self, cmd: "Sequence[str]", config: SandboxConfig, *, input_data: bytes | None = None, timeout: float | None = None, sandbox_mode: str = "auto", ) -> subprocess.CompletedProcess[bytes]: """Execute a command in the sandbox. Args: cmd: Command and arguments to execute config: Sandbox configuration input_data: Optional input to pass to the process stdin timeout: Optional timeout in seconds sandbox_mode: One of "auto", "disabled", or "required" Returns: CompletedProcess with stdout and stderr Raises: SandboxError: If sandbox execution fails subprocess.TimeoutExpired: If command times out """ def _build_env(self, config: SandboxConfig) -> dict[str, str]: """Build environment variables for sandboxed process. Args: config: Sandbox configuration with env_passthrough list Returns: Dictionary of environment variables to pass to subprocess """ env: dict[str, str] = {} for key in config.env_passthrough: if not config.pass_aws_env and key in AWS_SECRET_ENV_VARS: continue if key in os.environ: env[key] = os.environ[key] return env class LinuxLandlockBackend(SandboxBackend): """Linux sandbox using Landlock LSM. Landlock is a Linux security module that allows unprivileged processes to restrict themselves. Requires kernel 5.13+. """ def __init__(self): self._available: bool | None = None self._landlock_module = None def is_available(self) -> bool: """Check if Landlock is available.""" if self._available is not None: return self._available if platform.system() != "Linux": self._available = False return False try: release = platform.release() major, minor = map(int, release.split(".")[:2]) if major < 5 or (major == 5 and minor < 13): logger.debug(f"Kernel {release} too old for Landlock (need 5.13+)") self._available = False return False except (ValueError, IndexError): logger.debug(f"Could not parse kernel version: {platform.release()}") self._available = False return False try: import landlock self._landlock_module = landlock self._available = True logger.debug("Landlock is available") except ImportError: logger.debug("landlock Python module not installed") self._available = False return self._available def execute( self, cmd: "Sequence[str]", config: SandboxConfig, *, input_data: bytes | None = None, timeout: float | None = None, sandbox_mode: str = "auto", ) -> subprocess.CompletedProcess[bytes]: """Execute command with Landlock restrictions.""" if not self.is_available(): raise SandboxError("Landlock is not available") # Import here to avoid errors on non-Linux import landlock def setup_sandbox(): """Set up Landlock sandbox before exec.""" try: rs = landlock.Ruleset() for path in config.read_paths: if os.path.exists(path): rs.allow(path) # AWS credential paths (includes custom paths from env vars) if config.allow_aws_config: for aws_path in get_aws_credential_paths(): if os.path.exists(aws_path): rs.allow(aws_path) for path in config.write_paths: if os.path.exists(path): rs.allow(path, write=True) rs.apply() except Exception as e: if sandbox_mode == "required": raise RuntimeError(f"Sandboxing required but Landlock failed: {e}") from e logger.warning(f"Landlock sandbox failed (mode={sandbox_mode}), continuing unsandboxed: {e}") env = self._build_env(config) return subprocess.run( list(cmd), input=input_data, capture_output=True, timeout=timeout, env=env, preexec_fn=setup_sandbox, ) class LinuxBubblewrapBackend(SandboxBackend): """Linux sandbox using bubblewrap (bwrap). Bubblewrap is a lightweight sandbox tool that uses Linux namespaces and seccomp. It's available on most Linux distributions. """ def __init__(self): self._available: bool | None = None self._bwrap_path: str | None = None def is_available(self) -> bool: """Check if bubblewrap is available.""" if self._available is not None: return self._available if platform.system() != "Linux": self._available = False return False self._bwrap_path = shutil.which("bwrap") self._available = self._bwrap_path is not None if self._available: logger.debug(f"Bubblewrap available at {self._bwrap_path}") else: logger.debug("Bubblewrap (bwrap) not found") return self._available def _build_bwrap_args(self, config: SandboxConfig) -> list[str]: """Build bwrap command line arguments.""" args = [self._bwrap_path] for path in config.read_paths: if os.path.exists(path): args.extend(["--ro-bind", path, path]) # AWS credential paths (includes custom paths from env vars) if config.allow_aws_config: for aws_path in get_aws_credential_paths(): if os.path.exists(aws_path): args.extend(["--ro-bind", aws_path, aws_path]) for path in config.write_paths: if os.path.exists(path): args.extend(["--bind", path, path]) args.extend( [ "--proc", "/proc", "--dev", "/dev", ] ) # Security: PID namespace isolation args.append("--unshare-pid") # Security: Prevent TTY injection attacks args.append("--new-session") # Security: Kill sandbox if parent dies args.append("--die-with-parent") if not config.allow_network: args.append("--unshare-net") args.append("--") return args def execute( self, cmd: "Sequence[str]", config: SandboxConfig, *, input_data: bytes | None = None, timeout: float | None = None, sandbox_mode: str = "auto", ) -> subprocess.CompletedProcess[bytes]: """Execute command in bubblewrap sandbox.""" if not self.is_available(): raise SandboxError("Bubblewrap is not available") bwrap_args = self._build_bwrap_args(config) full_cmd = bwrap_args + list(cmd) env = self._build_env(config) return subprocess.run( full_cmd, input=input_data, capture_output=True, timeout=timeout, env=env, ) class MacOSSeatbeltBackend(SandboxBackend): """macOS sandbox using Seatbelt (sandbox-exec). Seatbelt is macOS's built-in sandboxing technology. While Apple considers the public API deprecated, it's still functional and used by Chrome, Firefox, and other major applications. """ PROFILE_TEMPLATE = """ (version 1) (deny default) ; Allow process operations (allow process*) (allow signal) (allow sysctl-read) (allow mach-lookup) (allow mach-register) (allow ipc-posix*) (allow system-socket) ; Allow reading system paths (allow file-read* (subpath "/usr") (subpath "/bin") (subpath "/sbin") (subpath "/Library") (subpath "/System") (subpath "/private/etc") (subpath "/private/var/run") (subpath "/var/run") (subpath "/dev") (subpath "/opt/homebrew") (subpath "/Applications/Xcode.app") (literal "/") (literal "/private") (literal "/private/var") ) ; Allow reading AWS config if enabled {aws_config_rule} ; Allow reading and writing to allowed paths {write_rules} ; Allow network access if enabled {network_rule} ; Allow reading environment and process info (allow file-read-metadata) (allow process-info*) """ def __init__(self): self._available: bool | None = None def is_available(self) -> bool: """Check if Seatbelt is available.""" if self._available is not None: return self._available if platform.system() != "Darwin": self._available = False return False # Check macOS version (need 12+) try: version = platform.mac_ver()[0] major = int(version.split(".")[0]) if major < 12: logger.debug(f"macOS {version} too old for Seatbelt (need 12+)") self._available = False return False except (ValueError, IndexError): logger.debug(f"Could not parse macOS version: {platform.mac_ver()}") # Try to load libSystem for sandbox_init try: ctypes.CDLL("/usr/lib/libSystem.B.dylib") self._available = True logger.debug("Seatbelt is available") except OSError: logger.debug("Could not load libSystem") self._available = False return self._available def _build_profile(self, config: SandboxConfig) -> str: """Build Seatbelt profile from configuration.""" # AWS credential paths rule (includes custom paths from env vars) if config.allow_aws_config: aws_paths = get_aws_credential_paths() rules = [] for aws_path in aws_paths: if os.path.isdir(aws_path): rules.append(f'(allow file-read* (subpath "{aws_path}"))') else: rules.append(f'(allow file-read* (literal "{aws_path}"))') aws_config_rule = "\n".join(rules) if rules else "; No AWS paths" else: aws_config_rule = "; AWS config access disabled" # Write rules write_rules = [] for path in config.write_paths: if os.path.exists(path): write_rules.append(f'(allow file-read* file-write* (subpath "{path}"))') write_rules_str = "\n".join(write_rules) if write_rules else "; No write paths" # Network rule if config.allow_network: network_rule = "(allow network*)" else: network_rule = "; Network access disabled" return self.PROFILE_TEMPLATE.format( aws_config_rule=aws_config_rule, write_rules=write_rules_str, network_rule=network_rule, ) def execute( self, cmd: "Sequence[str]", config: SandboxConfig, *, input_data: bytes | None = None, timeout: float | None = None, sandbox_mode: str = "auto", ) -> subprocess.CompletedProcess[bytes]: """Execute command with Seatbelt sandbox.""" if not self.is_available(): raise SandboxError("Seatbelt is not available") profile = self._build_profile(config) env = self._build_env(config) # Use sandbox-exec command line tool sandbox_cmd = [ "/usr/bin/sandbox-exec", "-p", profile, *cmd, ] return subprocess.run( sandbox_cmd, input=input_data, capture_output=True, timeout=timeout, env=env, ) class NoOpBackend(SandboxBackend): """No-op sandbox backend that executes commands without isolation. Used when sandboxing is disabled or no sandbox backend is available. """ def is_available(self) -> bool: """Always available as fallback.""" return True def execute( self, cmd: "Sequence[str]", config: SandboxConfig, *, input_data: bytes | None = None, timeout: float | None = None, sandbox_mode: str = "auto", ) -> subprocess.CompletedProcess[bytes]: """Execute command without sandboxing.""" env = self._build_env(config) return subprocess.run( list(cmd), input=input_data, capture_output=True, timeout=timeout, env=env, ) class Sandbox: """Main sandbox class that selects and uses the appropriate backend.""" def __init__( self, config: SandboxConfig | None = None, sandbox_mode: str = "auto", ): """Initialize sandbox with configuration. Args: config: Sandbox configuration. If None, uses defaults. sandbox_mode: One of "auto", "disabled", or "required" """ self.config = config or SandboxConfig() self.sandbox_mode = sandbox_mode self._backend: SandboxBackend | None = None def _select_backend(self, sandbox_mode: str = "auto") -> SandboxBackend: """Select the best available sandbox backend. Args: sandbox_mode: One of "auto", "disabled", or "required" """ if sandbox_mode == "disabled": logger.info("Sandbox disabled via configuration") return NoOpBackend() system = platform.system() if system == "Linux": # Landlock preferred (kernel-level, no external dependencies) landlock_backend = LinuxLandlockBackend() if landlock_backend.is_available(): logger.info("Using Landlock sandbox backend") return landlock_backend # Bubblewrap fallback bwrap = LinuxBubblewrapBackend() if bwrap.is_available(): logger.info("Using Bubblewrap sandbox backend") return bwrap logger.warning("No Linux sandbox backend available, running without isolation") elif system == "Darwin": seatbelt = MacOSSeatbeltBackend() if seatbelt.is_available(): logger.info("Using Seatbelt sandbox backend") return seatbelt logger.warning("Seatbelt not available on this macOS version") else: logger.warning(f"No sandbox support for {system}") if sandbox_mode == "required": raise SandboxError(f"Sandbox required but not available on {system}") return NoOpBackend() @property def backend(self) -> SandboxBackend: """Get the sandbox backend, selecting one if needed.""" if self._backend is None: self._backend = self._select_backend(self.sandbox_mode) return self._backend def execute( self, cmd: "Sequence[str]", *, input_data: bytes | None = None, timeout: float | None = None, ) -> subprocess.CompletedProcess[bytes]: """Execute a command in the sandbox. Args: cmd: Command and arguments to execute input_data: Optional input to pass to the process stdin timeout: Optional timeout in seconds Returns: CompletedProcess with stdout and stderr Raises: SandboxError: If sandbox execution fails subprocess.TimeoutExpired: If command times out """ return self.backend.execute( cmd, self.config, input_data=input_data, timeout=timeout, sandbox_mode=self.sandbox_mode, ) def is_sandboxed(self) -> bool: """Check if commands will actually be sandboxed.""" return not isinstance(self.backend, NoOpBackend) # Global sandbox instance with default configuration _default_sandbox: Sandbox | None = None def get_sandbox() -> Sandbox: """Get the default sandbox instance.""" global _default_sandbox if _default_sandbox is None: # Avoid circular imports from aws_mcp_server.config import SANDBOX_CREDENTIAL_MODE, SANDBOX_MODE config = SandboxConfig( allow_aws_config=(SANDBOX_CREDENTIAL_MODE in ("aws_config", "both")), pass_aws_env=(SANDBOX_CREDENTIAL_MODE in ("env", "both")), ) _default_sandbox = Sandbox(config, sandbox_mode=SANDBOX_MODE) return _default_sandbox def reset_sandbox() -> None: """Reset the global sandbox instance. Useful for testing.""" global _default_sandbox _default_sandbox = None def execute_sandboxed( cmd: "Sequence[str]", *, input_data: bytes | None = None, timeout: float | None = None, ) -> subprocess.CompletedProcess[bytes]: """Execute a command in the default sandbox. This is a convenience function that uses the global sandbox instance. Args: cmd: Command and arguments to execute input_data: Optional input to pass to the process stdin timeout: Optional timeout in seconds Returns: CompletedProcess with stdout and stderr """ return get_sandbox().execute(cmd, input_data=input_data, timeout=timeout) async def execute_sandboxed_async( cmd: "Sequence[str]", *, input_data: bytes | None = None, timeout: float | None = None, ) -> tuple[bytes, bytes, int]: """Execute a command in the sandbox asynchronously. This runs the sandboxed command in a thread pool to avoid blocking the event loop. Args: cmd: Command and arguments to execute input_data: Optional input to pass to the process stdin timeout: Optional timeout in seconds Returns: Tuple of (stdout, stderr, return_code) Raises: SandboxError: If sandbox execution fails asyncio.TimeoutError: If command times out """ import asyncio loop = asyncio.get_running_loop() def run_sandboxed(): try: result = get_sandbox().execute(cmd, input_data=input_data, timeout=timeout) return result.stdout, result.stderr, result.returncode except subprocess.TimeoutExpired as e: raise asyncio.TimeoutError(f"Command timed out after {e.timeout} seconds") from e # Run in thread pool to avoid blocking return await loop.run_in_executor(None, run_sandboxed) async def execute_piped_sandboxed_async( commands: "Sequence[Sequence[str]]", *, timeout: float | None = None, ) -> tuple[bytes, bytes, int]: """Execute a pipeline of commands in the sandbox asynchronously. This runs commands in sequence, passing stdout of each to stdin of the next. Args: commands: List of commands, each being a sequence of command + args timeout: Optional timeout in seconds for the entire pipeline Returns: Tuple of (stdout, stderr, return_code) Raises: SandboxError: If sandbox execution fails asyncio.TimeoutError: If command times out """ import asyncio import time if not commands: return b"", b"Empty command", 1 loop = asyncio.get_running_loop() def run_pipeline(): sandbox = get_sandbox() current_input: bytes | None = None stderr_parts: list[bytes] = [] start_time = time.monotonic() if timeout is not None else None for i, cmd in enumerate(commands): # Calculate remaining time for this stage stage_timeout: float | None = None if timeout is not None: elapsed = time.monotonic() - start_time remaining = timeout - elapsed if remaining <= 0: raise asyncio.TimeoutError(f"Pipeline timed out after {timeout} seconds (at stage {i})") stage_timeout = remaining try: result = sandbox.execute( cmd, input_data=current_input, timeout=stage_timeout, ) if result.stderr: stderr_parts.append(result.stderr) if result.returncode != 0: return result.stdout, b"\n".join(stderr_parts), result.returncode current_input = result.stdout except subprocess.TimeoutExpired as e: elapsed = time.monotonic() - start_time if start_time else timeout raise asyncio.TimeoutError(f"Pipeline timed out after {elapsed:.1f} seconds (at stage {i})") from e return current_input or b"", b"\n".join(stderr_parts), 0 return await loop.run_in_executor(None, run_pipeline) def sandbox_available() -> bool: """Check if sandboxing is available on the current system. Returns False if sandbox is unavailable or if initialization fails (e.g., when AWS_MCP_SANDBOX=required but no backend is available). """ try: return get_sandbox().is_sandboxed() except SandboxError: return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/alexei-led/aws-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server