Skip to main content
Glama
bash_manager.py8.75 kB
# cmcp/managers/bash_manager.py # container-mcp © 2025 by Martin Bukowski is licensed under Apache 2.0 """Bash Manager for securely executing bash commands.""" import asyncio import os import shutil import subprocess from dataclasses import dataclass from typing import List, Optional from cmcp.utils.logging import get_logger logger = get_logger(__name__) @dataclass class BashResult: """Result of a bash command execution.""" stdout: str stderr: str exit_code: int class BashManager: """Manager for secure bash command execution.""" def __init__( self, sandbox_dir: str, allowed_commands: List[str], timeout_default: int = 30, timeout_max: int = 120, command_restricted: bool = True ): """Initialize the BashManager. Args: sandbox_dir: Directory for sandbox operations allowed_commands: List of allowed bash commands timeout_default: Default timeout in seconds timeout_max: Maximum allowed timeout in seconds command_restricted: Whether to restrict commands to allowed list """ self.sandbox_dir = sandbox_dir self.allowed_commands = allowed_commands self.timeout_default = timeout_default self.timeout_max = timeout_max self.command_restricted = command_restricted # Ensure sandbox directory exists os.makedirs(self.sandbox_dir, exist_ok=True) logger.debug(f"BashManager initialized with sandbox at {self.sandbox_dir}") logger.debug(f"Command restriction {'enabled' if command_restricted else 'disabled'}") if command_restricted: logger.debug(f"Allowed commands: {', '.join(allowed_commands) if allowed_commands else 'none'}") @classmethod def from_env(cls, config=None): """Create a BashManager from environment configuration. Args: config: Optional configuration object, loads from environment if not provided Returns: Configured BashManager instance """ if config is None: from cmcp.config import load_config config = load_config() logger.debug("Creating BashManager from environment configuration") return cls( sandbox_dir=config.bash_config.sandbox_dir, allowed_commands=config.bash_config.allowed_commands, timeout_default=config.bash_config.timeout_default, timeout_max=config.bash_config.timeout_max, command_restricted=config.bash_config.command_restricted ) async def execute(self, command: str, timeout: Optional[int] = None) -> BashResult: """Execute a bash command in sandbox. Args: command: The bash command to execute timeout: Optional timeout in seconds, defaults to timeout_default Returns: BashResult with stdout, stderr, and exit code """ # Apply timeout limit if timeout is None: timeout = self.timeout_default timeout = min(timeout, self.timeout_max) # Parse the command to check against allowed commands cmd_parts = command.split() if not cmd_parts: logger.warning("Empty command received") return BashResult(stdout="", stderr="Empty command", exit_code=1) # Check command against allowed list if restriction is enabled base_cmd = os.path.basename(cmd_parts[0]) if self.command_restricted: if not self.allowed_commands: logger.warning("No allowed commands configured") return BashResult( stdout="", stderr="Command restrictions enabled but no commands are allowed. Add commands to BASH_ALLOWED_COMMANDS or set COMMAND_RESTRICTED=false", exit_code=1 ) if base_cmd not in self.allowed_commands: logger.warning(f"Command not allowed: {base_cmd}") return BashResult( stdout="", stderr=f"Command not allowed: {base_cmd}. Allowed commands: {', '.join(self.allowed_commands)}", exit_code=1 ) else: logger.debug(f"Command restrictions disabled, allowing command: {base_cmd}") # Use direct subprocess with shell=True when restrictions are off for proper wildcard expansion if not self.command_restricted: logger.debug(f"Using direct subprocess execution with shell=True for command: {command}") try: # Use synchronous subprocess for simplicity with shell=True process = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=timeout, cwd=self.sandbox_dir ) return BashResult( stdout=process.stdout, stderr=process.stderr, exit_code=process.returncode ) except subprocess.TimeoutExpired: logger.warning(f"Command execution timed out after {timeout} seconds") return BashResult( stdout="", stderr=f"Command execution timed out after {timeout} seconds", exit_code=124 ) # For restricted mode, use the sandboxed environment (original implementation) # Use environment-aware sandbox command sandbox_cmd = self._get_sandbox_command(command) logger.debug(f"Executing command: {command}") logger.debug(f"Sandbox command: {' '.join(sandbox_cmd)}") # Execute with asyncio subprocess proc = await asyncio.create_subprocess_exec( *sandbox_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) try: stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout ) result = BashResult( stdout=stdout.decode(), stderr=stderr.decode(), exit_code=proc.returncode ) logger.debug(f"Command completed with exit code: {result.exit_code}") return result except asyncio.TimeoutError: proc.kill() logger.warning(f"Command execution timed out after {timeout} seconds") return BashResult( stdout="", stderr=f"Command execution timed out after {timeout} seconds", exit_code=124 ) def _get_sandbox_command(self, command: str) -> List[str]: """Get appropriate sandboxing command based on environment. Args: command: The user command to execute Returns: Command list with appropriate sandboxing wrappers """ if self._is_container(): # Full firejail with all security options in container return [ "firejail", "--noprofile", "--quiet", f"--private={self.sandbox_dir}", "--private-dev", "--private-tmp", "--caps.drop=all", "--nonewprivs", "--noroot", "--seccomp", "bash", "-c", command ] else: # Simplified sandbox for local development if self._is_firejail_available(): return [ "firejail", "--quiet", f"--private={self.sandbox_dir}", "bash", "-c", command ] else: # Fallback without sandboxing logger.warning("Running without firejail sandboxing - FOR DEVELOPMENT ONLY") return ["bash", "-c", command] def _is_container(self) -> bool: """Check if we're running in a container. Returns: True if running in a container environment """ return os.path.exists('/run/.containerenv') or os.path.exists('/.dockerenv') def _is_firejail_available(self) -> bool: """Check if firejail is available. Returns: True if firejail is installed and available """ return shutil.which("firejail") is not None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/54rt1n/container-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server