Skip to main content
Glama
local_sandbox.py14.9 kB
""" Local sandbox implementation for the enhanced Sandbox SDK. """ import io import sys import os import traceback import uuid import tempfile import shutil import logging from pathlib import Path from typing import Any, Dict, List, Optional from .base_sandbox import BaseSandbox from .execution import Execution from ..mcp_sandbox_server_stdio import ExecutionContext, monkey_patch_matplotlib, monkey_patch_pil from ..core.execution_context import PersistentExecutionContext logger = logging.getLogger(__name__) class LocalSandbox(BaseSandbox): """ Local sandbox implementation that uses the existing MCP server functionality. This provides secure local execution with artifact capture and virtual environment support. """ def __init__(self, **kwargs): """ Initialize a local sandbox instance. """ # Force remote=False for local sandboxes kwargs["remote"] = False super().__init__(**kwargs) # Initialize local execution context with persistence self._execution_context = PersistentExecutionContext() self._execution_globals = self._execution_context.globals_dict # Apply monkey patches for artifact capture monkey_patch_matplotlib() monkey_patch_pil() async def get_default_image(self) -> str: """ Get the default Docker image for local sandbox (not used in local execution). """ return "local-python" async def start( self, image: Optional[str] = None, memory: int = 512, cpus: float = 1.0, timeout: float = 180.0, ) -> None: """ Start the local sandbox. For local sandboxes, this primarily sets up the execution environment. """ if self._is_started: return # Already set up in PersistentExecutionContext # No additional setup needed for persistent context self._is_started = True async def stop(self) -> None: """ Stop the local sandbox and clean up resources. """ if not self._is_started: return # Clean up artifacts if needed # Note: We might want to preserve artifacts for user access # self._execution_context.cleanup_artifacts() self._is_started = False async def run(self, code: str, validate: bool = True) -> Execution: """ Execute Python code in the local sandbox with enhanced error handling. Args: code: Python code to execute validate: Whether to validate code before execution Returns: An Execution object representing the executed code Raises: RuntimeError: If the sandbox is not started or execution fails """ if not self._is_started: raise RuntimeError("Sandbox is not started. Call start() first.") # Use the enhanced persistent execution context with validation import hashlib cache_key = hashlib.md5(code.encode()).hexdigest() result = self._execution_context.execute_code( code, cache_key=cache_key, validate=validate ) # Create and return execution result with enhanced information execution = Execution( stdout=result.get('stdout', ''), stderr=result.get('stderr', ''), return_value=None, # Will be enhanced in future versions exception=Exception(result['error']) if result.get('error') else None, artifacts=result.get('artifacts', []), ) # Add validation result to execution if available if result.get('validation_result'): execution._validation_result = result['validation_result'] return execution @property def artifacts_dir(self) -> Optional[str]: """ Get the artifacts directory path. """ return str(self._execution_context.artifacts_dir) if self._execution_context.artifacts_dir else None def list_artifacts(self, format_type: str = 'list', recursive: bool = True) -> Any: """ List all artifacts created during execution with recursive scanning. Args: format_type: Output format ('list', 'json', 'csv', 'detailed') recursive: Whether to scan subdirectories recursively Returns: Artifacts in the requested format """ if not self._execution_context.artifacts_dir: return [] if format_type == 'list' else self._format_empty_artifacts(format_type) artifacts_dir = Path(self._execution_context.artifacts_dir) if not artifacts_dir.exists(): return [] if format_type == 'list' else self._format_empty_artifacts(format_type) # Get artifacts with full details artifacts = [] pattern = "**/*" if recursive else "*" for file_path in artifacts_dir.glob(pattern): if file_path.is_file(): try: stat = file_path.stat() artifact_info = { 'name': file_path.name, 'path': str(file_path.relative_to(artifacts_dir)), 'full_path': str(file_path), 'size': stat.st_size, 'created': stat.st_ctime, 'modified': stat.st_mtime, 'extension': file_path.suffix.lower(), 'type': self._categorize_file(file_path) } artifacts.append(artifact_info) except Exception as e: logger.warning(f"Failed to get info for {file_path}: {e}") return self._format_artifacts_output(artifacts, format_type) def _categorize_file(self, file_path: Path) -> str: """Categorize a file based on its extension and path.""" suffix = file_path.suffix.lower() path_str = str(file_path).lower() # Check for Manim files if any(pattern in path_str for pattern in ['manim', 'media', 'videos', 'images']): return 'manim' # Extension-based categorization type_mappings = { 'images': {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.svg', '.webp'}, 'videos': {'.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm', '.mkv'}, 'data': {'.csv', '.json', '.xml', '.yaml', '.yml', '.pkl', '.pickle', '.h5', '.hdf5'}, 'code': {'.py', '.js', '.html', '.css', '.sql', '.sh', '.bat'}, 'documents': {'.pdf', '.docx', '.doc', '.txt', '.md', '.rtf'}, 'audio': {'.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a'} } for category, extensions in type_mappings.items(): if suffix in extensions: return category return 'other' def _format_empty_artifacts(self, format_type: str) -> Any: """Format empty artifacts response.""" if format_type == 'json': return json.dumps([]) elif format_type == 'csv': return 'name,path,size,created,modified,extension,type\n' elif format_type == 'detailed': return {'total': 0, 'files': [], 'categories': {}} else: return [] def _format_artifacts_output(self, artifacts: List[Dict], format_type: str) -> Any: """Format artifacts output in the requested format.""" if format_type == 'list': return [artifact['path'] for artifact in artifacts] elif format_type == 'json': import json return json.dumps(artifacts, indent=2) elif format_type == 'csv': import csv import io output = io.StringIO() if artifacts: writer = csv.DictWriter(output, fieldnames=artifacts[0].keys()) writer.writeheader() writer.writerows(artifacts) return output.getvalue() elif format_type == 'detailed': # Group by category categories = {} for artifact in artifacts: category = artifact['type'] if category not in categories: categories[category] = [] categories[category].append(artifact) return { 'total': len(artifacts), 'files': artifacts, 'categories': categories, 'summary': { cat: {'count': len(files), 'total_size': sum(f['size'] for f in files)} for cat, files in categories.items() } } else: return artifacts def cleanup_artifacts(self) -> None: """ Clean up all artifacts. """ self._execution_context.cleanup_artifacts() def get_execution_info(self) -> Dict[str, Any]: """ Get information about the execution environment. """ return { "python_version": sys.version, "executable": sys.executable, "virtual_env": os.environ.get("VIRTUAL_ENV"), "project_root": str(self._execution_context.project_root), "artifacts_dir": self.artifacts_dir, "sys_path": sys.path[:10], # First 10 entries } def get_performance_stats(self) -> Dict[str, Any]: """ Get performance statistics from the execution context. """ return self._execution_context.get_execution_stats() def get_execution_history(self, limit: int = 50) -> List[Dict[str, Any]]: """ Get execution history. """ return self._execution_context.get_execution_history(limit=limit) def clear_cache(self) -> None: """ Clear compilation and execution cache. """ self._execution_context.clear_cache() def save_session(self) -> None: """ Manually save the current execution session state. """ self._execution_context.save_persistent_state() @property def session_id(self) -> str: """ Get the current session ID. """ return self._execution_context.session_id def cleanup_session(self) -> None: """ Cleanup the current session. """ self._execution_context.cleanup() def get_artifact_report(self) -> Dict[str, Any]: """ Get comprehensive artifact report with categorization. """ return self._execution_context.get_artifact_report() def categorize_artifacts(self) -> Dict[str, List[Dict[str, Any]]]: """ Categorize artifacts by type with detailed metadata. """ return self._execution_context.categorize_artifacts() def cleanup_artifacts_by_type(self, artifact_type: str) -> int: """ Clean up artifacts of a specific type. Args: artifact_type: Type of artifacts to clean (e.g., 'images', 'videos', 'plots') Returns: Number of artifacts cleaned up """ if not self._execution_context.artifacts_dir: return 0 categorized = self.categorize_artifacts() if artifact_type not in categorized: return 0 cleaned_count = 0 for file_info in categorized[artifact_type]: try: file_path = Path(file_info['full_path']) if file_path.exists(): file_path.unlink() cleaned_count += 1 except Exception as e: logger.warning(f"Failed to delete {file_info['path']}: {e}") return cleaned_count def get_manim_artifacts(self) -> List[Dict[str, Any]]: """ Get all Manim-related artifacts. """ categorized = self.categorize_artifacts() return categorized.get('manim', []) def get_artifact_summary(self) -> str: """ Get a human-readable summary of artifacts. """ report = self.get_artifact_report() if report['total_artifacts'] == 0: return "No artifacts found." def format_size(bytes_size): for unit in ['B', 'KB', 'MB', 'GB']: if bytes_size < 1024.0: return f"{bytes_size:.1f} {unit}" bytes_size /= 1024.0 return f"{bytes_size:.1f} TB" lines = [ f"Total Artifacts: {report['total_artifacts']}", f"Total Size: {format_size(report['total_size'])}", "", "Categories:" ] for category, info in report['categories'].items(): lines.append(f" {category}: {info['count']} files ({format_size(info['size'])})") if report['recent_artifacts']: lines.extend([ "", "Recent Artifacts:" ]) for artifact in report['recent_artifacts'][:5]: lines.append(f" {artifact['name']} ({format_size(artifact['size'])})") return "\n".join(lines) def start_interactive_repl(self) -> None: """Start an enhanced interactive REPL session.""" from ..core.interactive_repl import EnhancedREPL repl = EnhancedREPL(self._execution_context) repl.start_interactive_session() def get_code_template(self, template_type: str) -> str: """Get code templates for common tasks.""" from ..core.code_validator import CodeValidator validator = CodeValidator() return validator.get_code_template(template_type) def get_available_templates(self) -> List[str]: """Get list of available code templates.""" from ..core.code_validator import CodeValidator validator = CodeValidator() return validator.get_available_templates() def validate_code(self, code: str) -> Dict[str, Any]: """Validate code before execution.""" from ..core.code_validator import CodeValidator validator = CodeValidator() return validator.validate_and_format(code) def get_manim_helper(self): """Get Manim helper for animation support.""" from ..core.manim_support import ManIMHelper return ManIMHelper(self._execution_context.artifacts_dir)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/scooter-lacroix/sandbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server