Skip to main content
Glama

Katamari MCP Server

by ciphernaut
sandbox.pyโ€ข18.5 kB
""" Sandboxed execution environment for generated capabilities. Provides resource limits, isolation, and security boundaries. """ import asyncio import logging import resource import tempfile import shutil import sys import traceback import signal from pathlib import Path from typing import Dict, Any, Optional, List from contextlib import asynccontextmanager import multiprocessing as mp from concurrent.futures import ProcessPoolExecutor, TimeoutError logger = logging.getLogger(__name__) class SandboxConfig: """Configuration for sandbox execution.""" def __init__(self): # Resource limits (sensible defaults) self.max_cpu_time = 30 # seconds self.max_memory = 256 * 1024 * 1024 # 256MB in bytes self.max_processes = 1 # single process self.max_file_size = 10 * 1024 * 1024 # 10MB # Security settings self.allow_network = False self.allow_file_write = True self.allowed_imports = self._get_default_allowed_imports() self.temp_dir = None # Directories (relative to workspace) self.readonly_dirs = [] self.workspace_dirs = [] def _get_default_allowed_imports(self) -> List[str]: """Get list of allowed standard library imports.""" return [ # Core Python 'asyncio', 'collections', 'contextlib', 'datetime', 'enum', 'functools', 'itertools', 'json', 'logging', 'math', 'pathlib', 're', 'string', 'time', 'typing', 'uuid', # Data structures 'array', 'bisect', 'heapq', 'queue', 'weakref', # File/IO (limited) 'io', 'os', 'tempfile', 'shutil', 'glob', # Text processing 'textwrap', 'unicodedata', 'difflib', # Numeric 'decimal', 'fractions', 'numbers', 'random', 'statistics', # Functional 'operator', 'collections.abc', # System (limited) 'sys', 'traceback', 'warnings', 'contextvars', # Katamari internal 'katamari_mcp.utils.error_handler' ] class SandboxResult: """Result of sandboxed execution.""" def __init__(self, success: bool = False, data: Any = None, error: Optional[str] = None, execution_time: float = 0.0, resource_usage: Optional[Dict[str, Any]] = None): self.success = success self.data = data self.error = error self.execution_time = execution_time self.resource_usage = resource_usage or {} class PythonSandbox: """Python sandbox with resource limits and isolation.""" def __init__(self, config: Optional[SandboxConfig] = None): self.config = config or SandboxConfig() self._temp_dir: Optional[Path] = None self._original_limits: Dict[str, Any] = {} async def execute(self, code: str, globals_dict: Optional[Dict[str, Any]] = None) -> SandboxResult: """Execute code in sandboxed environment.""" import time start_time = time.time() # Setup temporary directory temp_dir = await self._setup_temp_directory() try: # Setup resource limits await self._apply_resource_limits() # Prepare execution environment exec_globals = await self._prepare_execution_globals(globals_dict, temp_dir) # Execute code result = await self._execute_code(code, exec_globals) # Calculate execution time execution_time = time.time() - start_time # Get resource usage resource_usage = await self._get_resource_usage() return SandboxResult( success=True, data=result, execution_time=execution_time, resource_usage=resource_usage ) except Exception as e: execution_time = time.time() - start_time logger.error(f"Sandbox execution failed: {e}") return SandboxResult( success=False, error=str(e), execution_time=execution_time ) finally: # Cleanup await self._cleanup_temp_directory(temp_dir) await self._restore_resource_limits() async def _setup_temp_directory(self) -> Path: """Setup temporary directory for execution.""" temp_dir = Path(tempfile.mkdtemp(prefix="katamari_sandbox_")) # Create subdirectories (temp_dir / "workspace").mkdir(exist_ok=True) (temp_dir / "readonly").mkdir(exist_ok=True) # Setup readonly directories if specified for readonly_dir in self.config.readonly_dirs: src = Path(readonly_dir) if src.exists(): dst = temp_dir / "readonly" / src.name shutil.copytree(src, dst, dirs_exist_ok=True) self._temp_dir = temp_dir return temp_dir async def _cleanup_temp_directory(self, temp_dir: Path): """Cleanup temporary directory.""" try: if temp_dir and temp_dir.exists(): shutil.rmtree(temp_dir, ignore_errors=True) except Exception as e: logger.warning(f"Failed to cleanup temp dir {temp_dir}: {e}") async def _apply_resource_limits(self): """Apply resource limits to current process.""" try: # Memory limit try: self._original_limits['memory'] = resource.getrlimit(resource.RLIMIT_AS) resource.setrlimit(resource.RLIMIT_AS, (self.config.max_memory, self.config.max_memory)) except (OSError, ValueError): self._original_limits['memory'] = None # CPU time limit try: self._original_limits['cpu'] = resource.getrlimit(resource.RLIMIT_CPU) resource.setrlimit(resource.RLIMIT_CPU, (self.config.max_cpu_time, self.config.max_cpu_time)) except (OSError, ValueError): self._original_limits['cpu'] = None # Process limit try: self._original_limits['processes'] = resource.getrlimit(resource.RLIMIT_NPROC) resource.setrlimit(resource.RLIMIT_NPROC, (self.config.max_processes, self.config.max_processes)) except (OSError, ValueError): self._original_limits['processes'] = None # File size limit try: self._original_limits['fsize'] = resource.getrlimit(resource.RLIMIT_FSIZE) resource.setrlimit(resource.RLIMIT_FSIZE, (self.config.max_file_size, self.config.max_file_size)) except (OSError, ValueError): self._original_limits['fsize'] = None except (OSError, ValueError) as e: logger.warning(f"Failed to apply resource limits: {e}") async def _restore_resource_limits(self): """Restore original resource limits.""" try: for limit_type, value in self._original_limits.items(): if limit_type == 'memory' and value: resource.setrlimit(resource.RLIMIT_AS, value) elif limit_type == 'cpu' and value: resource.setrlimit(resource.RLIMIT_CPU, value) elif limit_type == 'processes' and value: resource.setrlimit(resource.RLIMIT_NPROC, value) elif limit_type == 'fsize' and value: resource.setrlimit(resource.RLIMIT_FSIZE, value) except (OSError, ValueError) as e: logger.warning(f"Failed to restore resource limits: {e}") async def _prepare_execution_globals(self, globals_dict: Optional[Dict[str, Any]] = None, temp_dir: Optional[Path] = None) -> Dict[str, Any]: """Prepare global namespace for execution.""" import builtins # Base globals exec_globals = { '__builtins__': self._create_restricted_builtins(), '__name__': '__sandbox__', '__file__': str(temp_dir / "workspace" / "generated_capability.py") if temp_dir else "generated_capability.py", '__temp_dir__': str(temp_dir) if temp_dir else "", '__workspace_dir__': str(temp_dir / "workspace") if temp_dir else "", '__readonly_dir__': str(temp_dir / "readonly") if temp_dir else "" } # Add user-provided globals if globals_dict: exec_globals.update(globals_dict) return exec_globals def _create_restricted_builtins(self): """Create restricted builtins for sandbox.""" import builtins # Remove dangerous builtins restricted_builtins = {} allowed_builtins = { # Core Python builtins 'abs', 'all', 'any', 'ascii', 'bin', 'bool', 'bytearray', 'bytes', 'callable', 'chr', 'classmethod', 'compile', 'complex', 'delattr', 'dict', 'dir', 'divmod', 'enumerate', 'eval', 'exec', 'filter', 'float', 'format', 'frozenset', 'getattr', 'globals', 'hasattr', 'hash', 'help', 'hex', 'id', 'input', 'int', 'isinstance', 'issubclass', 'iter', 'len', 'list', 'locals', 'map', 'max', 'memoryview', 'min', 'next', 'object', 'oct', 'open', 'ord', 'pow', 'print', 'property', 'range', 'repr', 'reversed', 'round', 'set', 'setattr', 'slice', 'sorted', 'staticmethod', 'str', 'sum', 'super', 'tuple', 'type', 'vars', 'zip', # Essential builtins for class definitions and advanced features '__build_class__', '__name__', '__package__', '__doc__', '__file__', '__annotations__', '__loader__', '__spec__', '__cached__' } for name in allowed_builtins: if hasattr(builtins, name): restricted_builtins[name] = getattr(builtins, name) # Add safe versions of eval/exec if needed (with restrictions) restricted_builtins['__import__'] = self._create_restricted_import() return restricted_builtins def _create_restricted_import(self): """Create restricted import function.""" allowed_imports = set(self.config.allowed_imports) def restricted_import(name, globals=None, locals=None, fromlist=(), level=0): """Restricted import function.""" # Check if module is allowed if name in allowed_imports: return __import__(name, globals, locals, fromlist, level) # Check if any fromlist items are allowed for item in fromlist: if f"{name}.{item}" not in allowed_imports: raise ImportError(f"Import of {name}.{item} not allowed in sandbox") # Check submodules for allowed in allowed_imports: if allowed.startswith(f"{name}."): return __import__(name, globals, locals, fromlist, level) raise ImportError(f"Import of {name} not allowed in sandbox") return restricted_import async def _execute_code(self, code: str, exec_globals: Dict[str, Any]) -> Any: """Execute code with timeout and error handling.""" import signal import asyncio def timeout_handler(signum, frame): raise TimeoutError("Sandbox execution timeout") # Set timeout old_handler = signal.signal(signal.SIGXCPU, timeout_handler) try: # Compile code first to check syntax compiled_code = compile(code, '<sandbox>', 'exec') # Check if code contains async functions has_async = 'async def' in code or 'await ' in code if has_async: # For async code, we need to create and run an async function indented_code = chr(10).join(' ' + line for line in code.split(chr(10))) async_wrapper = f""" async def _sandbox_async_wrapper(): {indented_code} # Look for result variable or return last expression if 'result' in locals(): return result elif '__result__' in locals(): return __result__ else: return None # Execute the async wrapper result = await _sandbox_async_wrapper() """ # Execute the wrapper exec(async_wrapper, exec_globals) # Run the async function if '_sandbox_async_wrapper' in exec_globals: # Create a new event loop for this execution if needed try: result = await exec_globals['_sandbox_async_wrapper']() return result except Exception as e: raise RuntimeError(f"Async execution error: {e}") else: raise RuntimeError("Failed to create async wrapper") else: # For synchronous code, execute directly exec(compiled_code, exec_globals) # Look for result variable if 'result' in exec_globals: return exec_globals['result'] elif '__result__' in exec_globals: return exec_globals['__result__'] else: return None except TimeoutError: raise TimeoutError("Sandbox execution timed out") except Exception as e: raise RuntimeError(f"Sandbox execution error: {e}") finally: signal.signal(signal.SIGXCPU, old_handler) async def _get_resource_usage(self) -> Dict[str, Any]: """Get current resource usage.""" try: import psutil process = psutil.Process() return { 'memory_rss': process.memory_info().rss, 'memory_vms': process.memory_info().vms, 'cpu_percent': process.cpu_percent(), 'num_threads': process.num_threads(), 'open_files': len(process.open_files()) if hasattr(process, 'open_files') else 0, 'connections': len(process.connections()) if hasattr(process, 'connections') else 0 } except ImportError: # Fallback to basic resource usage try: usage = resource.getrusage(resource.RUSAGE_SELF) return { 'max_rss': usage.ru_maxrss, 'user_time': usage.ru_utime, 'system_time': usage.ru_stime, 'page_faults': usage.ru_majflt } except: return { 'memory_rss': 0, 'cpu_percent': 0.0, 'num_threads': 1, 'open_files': 0, 'connections': 0 } class CapabilitySandbox: """High-level sandbox interface for capability execution.""" def __init__(self, config: Optional[SandboxConfig] = None): self.sandbox = PythonSandbox(config) self.config = config or SandboxConfig() async def execute_capability(self, code: str, capability_name: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None) -> SandboxResult: """Execute a capability in the sandbox.""" logger.info(f"Executing capability {capability_name or 'unnamed'} in sandbox") # Prepare execution globals exec_globals = { 'capability_name': capability_name, 'parameters': parameters or {} } # Execute in sandbox result = await self.sandbox.execute(code, exec_globals) return result async def validate_capability(self, code: str) -> List[str]: """Validate capability code before execution.""" issues = [] # Syntax check try: compile(code, '<validation>', 'exec') except SyntaxError as e: issues.append(f"Syntax error: {e}") return issues # Security checks dangerous_patterns = [ # System access r'import\s+os\s*\.\s*system', r'import\s+subprocess', r'import\s+socket', r'import\s+threading', r'import\s+multiprocessing', r'import\s+ctypes', # Code execution r'eval\s*\(', r'exec\s*\(', r'__import__\s*\(', r'compile\s*\(', # File system operations r'open\s*\([^)]*\.\.\.', r'shutil\.rmtree\s*\([^)]*\.\.\.', r'os\.remove\s*\(', r'os\.rmdir\s*\(', r'os\.mkdir\s*\(', # Network operations r'urllib\.', r'requests\.', r'http\.', r'socket\.', # Dangerous builtins r'globals\s*\(', r'locals\s*\(', r'vars\s*\(', r'dir\s*\(', ] import re for pattern in dangerous_patterns: if re.search(pattern, code): issues.append(f"Potentially dangerous code detected: {pattern}") return issues def get_sandbox_status(self) -> Dict[str, Any]: """Get sandbox configuration and status.""" return { 'config': { 'max_cpu_time': self.config.max_cpu_time, 'max_memory': self.config.max_memory, 'max_processes': self.config.max_processes, 'allow_network': self.config.allow_network, 'allowed_imports': len(self.config.allowed_imports) }, 'temp_dir': str(self.sandbox._temp_dir) if self.sandbox._temp_dir else None }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server