sandbox.pyโข18.5 kB
"""
Sandboxed execution environment for generated capabilities.
Provides resource limits, isolation, and security boundaries.
"""
import asyncio
import logging
import resource
import tempfile
import shutil
import sys
import traceback
import signal
from pathlib import Path
from typing import Dict, Any, Optional, List
from contextlib import asynccontextmanager
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, TimeoutError
logger = logging.getLogger(__name__)
class SandboxConfig:
"""Configuration for sandbox execution."""
def __init__(self):
# Resource limits (sensible defaults)
self.max_cpu_time = 30 # seconds
self.max_memory = 256 * 1024 * 1024 # 256MB in bytes
self.max_processes = 1 # single process
self.max_file_size = 10 * 1024 * 1024 # 10MB
# Security settings
self.allow_network = False
self.allow_file_write = True
self.allowed_imports = self._get_default_allowed_imports()
self.temp_dir = None
# Directories (relative to workspace)
self.readonly_dirs = []
self.workspace_dirs = []
def _get_default_allowed_imports(self) -> List[str]:
"""Get list of allowed standard library imports."""
return [
# Core Python
'asyncio', 'collections', 'contextlib', 'datetime', 'enum',
'functools', 'itertools', 'json', 'logging', 'math',
'pathlib', 're', 'string', 'time', 'typing', 'uuid',
# Data structures
'array', 'bisect', 'heapq', 'queue', 'weakref',
# File/IO (limited)
'io', 'os', 'tempfile', 'shutil', 'glob',
# Text processing
'textwrap', 'unicodedata', 'difflib',
# Numeric
'decimal', 'fractions', 'numbers', 'random', 'statistics',
# Functional
'operator', 'collections.abc',
# System (limited)
'sys', 'traceback', 'warnings', 'contextvars',
# Katamari internal
'katamari_mcp.utils.error_handler'
]
class SandboxResult:
"""Result of sandboxed execution."""
def __init__(self, success: bool = False, data: Any = None,
error: Optional[str] = None, execution_time: float = 0.0,
resource_usage: Optional[Dict[str, Any]] = None):
self.success = success
self.data = data
self.error = error
self.execution_time = execution_time
self.resource_usage = resource_usage or {}
class PythonSandbox:
"""Python sandbox with resource limits and isolation."""
def __init__(self, config: Optional[SandboxConfig] = None):
self.config = config or SandboxConfig()
self._temp_dir: Optional[Path] = None
self._original_limits: Dict[str, Any] = {}
async def execute(self, code: str, globals_dict: Optional[Dict[str, Any]] = None) -> SandboxResult:
"""Execute code in sandboxed environment."""
import time
start_time = time.time()
# Setup temporary directory
temp_dir = await self._setup_temp_directory()
try:
# Setup resource limits
await self._apply_resource_limits()
# Prepare execution environment
exec_globals = await self._prepare_execution_globals(globals_dict, temp_dir)
# Execute code
result = await self._execute_code(code, exec_globals)
# Calculate execution time
execution_time = time.time() - start_time
# Get resource usage
resource_usage = await self._get_resource_usage()
return SandboxResult(
success=True,
data=result,
execution_time=execution_time,
resource_usage=resource_usage
)
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"Sandbox execution failed: {e}")
return SandboxResult(
success=False,
error=str(e),
execution_time=execution_time
)
finally:
# Cleanup
await self._cleanup_temp_directory(temp_dir)
await self._restore_resource_limits()
async def _setup_temp_directory(self) -> Path:
"""Setup temporary directory for execution."""
temp_dir = Path(tempfile.mkdtemp(prefix="katamari_sandbox_"))
# Create subdirectories
(temp_dir / "workspace").mkdir(exist_ok=True)
(temp_dir / "readonly").mkdir(exist_ok=True)
# Setup readonly directories if specified
for readonly_dir in self.config.readonly_dirs:
src = Path(readonly_dir)
if src.exists():
dst = temp_dir / "readonly" / src.name
shutil.copytree(src, dst, dirs_exist_ok=True)
self._temp_dir = temp_dir
return temp_dir
async def _cleanup_temp_directory(self, temp_dir: Path):
"""Cleanup temporary directory."""
try:
if temp_dir and temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
logger.warning(f"Failed to cleanup temp dir {temp_dir}: {e}")
async def _apply_resource_limits(self):
"""Apply resource limits to current process."""
try:
# Memory limit
try:
self._original_limits['memory'] = resource.getrlimit(resource.RLIMIT_AS)
resource.setrlimit(resource.RLIMIT_AS, (self.config.max_memory, self.config.max_memory))
except (OSError, ValueError):
self._original_limits['memory'] = None
# CPU time limit
try:
self._original_limits['cpu'] = resource.getrlimit(resource.RLIMIT_CPU)
resource.setrlimit(resource.RLIMIT_CPU, (self.config.max_cpu_time, self.config.max_cpu_time))
except (OSError, ValueError):
self._original_limits['cpu'] = None
# Process limit
try:
self._original_limits['processes'] = resource.getrlimit(resource.RLIMIT_NPROC)
resource.setrlimit(resource.RLIMIT_NPROC, (self.config.max_processes, self.config.max_processes))
except (OSError, ValueError):
self._original_limits['processes'] = None
# File size limit
try:
self._original_limits['fsize'] = resource.getrlimit(resource.RLIMIT_FSIZE)
resource.setrlimit(resource.RLIMIT_FSIZE, (self.config.max_file_size, self.config.max_file_size))
except (OSError, ValueError):
self._original_limits['fsize'] = None
except (OSError, ValueError) as e:
logger.warning(f"Failed to apply resource limits: {e}")
async def _restore_resource_limits(self):
"""Restore original resource limits."""
try:
for limit_type, value in self._original_limits.items():
if limit_type == 'memory' and value:
resource.setrlimit(resource.RLIMIT_AS, value)
elif limit_type == 'cpu' and value:
resource.setrlimit(resource.RLIMIT_CPU, value)
elif limit_type == 'processes' and value:
resource.setrlimit(resource.RLIMIT_NPROC, value)
elif limit_type == 'fsize' and value:
resource.setrlimit(resource.RLIMIT_FSIZE, value)
except (OSError, ValueError) as e:
logger.warning(f"Failed to restore resource limits: {e}")
async def _prepare_execution_globals(self, globals_dict: Optional[Dict[str, Any]] = None, temp_dir: Optional[Path] = None) -> Dict[str, Any]:
"""Prepare global namespace for execution."""
import builtins
# Base globals
exec_globals = {
'__builtins__': self._create_restricted_builtins(),
'__name__': '__sandbox__',
'__file__': str(temp_dir / "workspace" / "generated_capability.py") if temp_dir else "generated_capability.py",
'__temp_dir__': str(temp_dir) if temp_dir else "",
'__workspace_dir__': str(temp_dir / "workspace") if temp_dir else "",
'__readonly_dir__': str(temp_dir / "readonly") if temp_dir else ""
}
# Add user-provided globals
if globals_dict:
exec_globals.update(globals_dict)
return exec_globals
def _create_restricted_builtins(self):
"""Create restricted builtins for sandbox."""
import builtins
# Remove dangerous builtins
restricted_builtins = {}
allowed_builtins = {
# Core Python builtins
'abs', 'all', 'any', 'ascii', 'bin', 'bool', 'bytearray',
'bytes', 'callable', 'chr', 'classmethod', 'compile', 'complex',
'delattr', 'dict', 'dir', 'divmod', 'enumerate', 'eval',
'exec', 'filter', 'float', 'format', 'frozenset', 'getattr',
'globals', 'hasattr', 'hash', 'help', 'hex', 'id', 'input',
'int', 'isinstance', 'issubclass', 'iter', 'len', 'list',
'locals', 'map', 'max', 'memoryview', 'min', 'next',
'object', 'oct', 'open', 'ord', 'pow', 'print', 'property',
'range', 'repr', 'reversed', 'round', 'set', 'setattr',
'slice', 'sorted', 'staticmethod', 'str', 'sum', 'super',
'tuple', 'type', 'vars', 'zip',
# Essential builtins for class definitions and advanced features
'__build_class__', '__name__', '__package__', '__doc__', '__file__',
'__annotations__', '__loader__', '__spec__', '__cached__'
}
for name in allowed_builtins:
if hasattr(builtins, name):
restricted_builtins[name] = getattr(builtins, name)
# Add safe versions of eval/exec if needed (with restrictions)
restricted_builtins['__import__'] = self._create_restricted_import()
return restricted_builtins
def _create_restricted_import(self):
"""Create restricted import function."""
allowed_imports = set(self.config.allowed_imports)
def restricted_import(name, globals=None, locals=None, fromlist=(), level=0):
"""Restricted import function."""
# Check if module is allowed
if name in allowed_imports:
return __import__(name, globals, locals, fromlist, level)
# Check if any fromlist items are allowed
for item in fromlist:
if f"{name}.{item}" not in allowed_imports:
raise ImportError(f"Import of {name}.{item} not allowed in sandbox")
# Check submodules
for allowed in allowed_imports:
if allowed.startswith(f"{name}."):
return __import__(name, globals, locals, fromlist, level)
raise ImportError(f"Import of {name} not allowed in sandbox")
return restricted_import
async def _execute_code(self, code: str, exec_globals: Dict[str, Any]) -> Any:
"""Execute code with timeout and error handling."""
import signal
import asyncio
def timeout_handler(signum, frame):
raise TimeoutError("Sandbox execution timeout")
# Set timeout
old_handler = signal.signal(signal.SIGXCPU, timeout_handler)
try:
# Compile code first to check syntax
compiled_code = compile(code, '<sandbox>', 'exec')
# Check if code contains async functions
has_async = 'async def' in code or 'await ' in code
if has_async:
# For async code, we need to create and run an async function
indented_code = chr(10).join(' ' + line for line in code.split(chr(10)))
async_wrapper = f"""
async def _sandbox_async_wrapper():
{indented_code}
# Look for result variable or return last expression
if 'result' in locals():
return result
elif '__result__' in locals():
return __result__
else:
return None
# Execute the async wrapper
result = await _sandbox_async_wrapper()
"""
# Execute the wrapper
exec(async_wrapper, exec_globals)
# Run the async function
if '_sandbox_async_wrapper' in exec_globals:
# Create a new event loop for this execution if needed
try:
result = await exec_globals['_sandbox_async_wrapper']()
return result
except Exception as e:
raise RuntimeError(f"Async execution error: {e}")
else:
raise RuntimeError("Failed to create async wrapper")
else:
# For synchronous code, execute directly
exec(compiled_code, exec_globals)
# Look for result variable
if 'result' in exec_globals:
return exec_globals['result']
elif '__result__' in exec_globals:
return exec_globals['__result__']
else:
return None
except TimeoutError:
raise TimeoutError("Sandbox execution timed out")
except Exception as e:
raise RuntimeError(f"Sandbox execution error: {e}")
finally:
signal.signal(signal.SIGXCPU, old_handler)
async def _get_resource_usage(self) -> Dict[str, Any]:
"""Get current resource usage."""
try:
import psutil
process = psutil.Process()
return {
'memory_rss': process.memory_info().rss,
'memory_vms': process.memory_info().vms,
'cpu_percent': process.cpu_percent(),
'num_threads': process.num_threads(),
'open_files': len(process.open_files()) if hasattr(process, 'open_files') else 0,
'connections': len(process.connections()) if hasattr(process, 'connections') else 0
}
except ImportError:
# Fallback to basic resource usage
try:
usage = resource.getrusage(resource.RUSAGE_SELF)
return {
'max_rss': usage.ru_maxrss,
'user_time': usage.ru_utime,
'system_time': usage.ru_stime,
'page_faults': usage.ru_majflt
}
except:
return {
'memory_rss': 0,
'cpu_percent': 0.0,
'num_threads': 1,
'open_files': 0,
'connections': 0
}
class CapabilitySandbox:
"""High-level sandbox interface for capability execution."""
def __init__(self, config: Optional[SandboxConfig] = None):
self.sandbox = PythonSandbox(config)
self.config = config or SandboxConfig()
async def execute_capability(self, code: str, capability_name: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None) -> SandboxResult:
"""Execute a capability in the sandbox."""
logger.info(f"Executing capability {capability_name or 'unnamed'} in sandbox")
# Prepare execution globals
exec_globals = {
'capability_name': capability_name,
'parameters': parameters or {}
}
# Execute in sandbox
result = await self.sandbox.execute(code, exec_globals)
return result
async def validate_capability(self, code: str) -> List[str]:
"""Validate capability code before execution."""
issues = []
# Syntax check
try:
compile(code, '<validation>', 'exec')
except SyntaxError as e:
issues.append(f"Syntax error: {e}")
return issues
# Security checks
dangerous_patterns = [
# System access
r'import\s+os\s*\.\s*system',
r'import\s+subprocess',
r'import\s+socket',
r'import\s+threading',
r'import\s+multiprocessing',
r'import\s+ctypes',
# Code execution
r'eval\s*\(',
r'exec\s*\(',
r'__import__\s*\(',
r'compile\s*\(',
# File system operations
r'open\s*\([^)]*\.\.\.',
r'shutil\.rmtree\s*\([^)]*\.\.\.',
r'os\.remove\s*\(',
r'os\.rmdir\s*\(',
r'os\.mkdir\s*\(',
# Network operations
r'urllib\.',
r'requests\.',
r'http\.',
r'socket\.',
# Dangerous builtins
r'globals\s*\(',
r'locals\s*\(',
r'vars\s*\(',
r'dir\s*\(',
]
import re
for pattern in dangerous_patterns:
if re.search(pattern, code):
issues.append(f"Potentially dangerous code detected: {pattern}")
return issues
def get_sandbox_status(self) -> Dict[str, Any]:
"""Get sandbox configuration and status."""
return {
'config': {
'max_cpu_time': self.config.max_cpu_time,
'max_memory': self.config.max_memory,
'max_processes': self.config.max_processes,
'allow_network': self.config.allow_network,
'allowed_imports': len(self.config.allowed_imports)
},
'temp_dir': str(self.sandbox._temp_dir) if self.sandbox._temp_dir else None
}