Skip to main content
Glama
mcp_sandbox_server.py22.1 kB
from fastmcp import FastMCP import io import sys import os import traceback import json import uuid import tempfile import shutil import subprocess from .core.resource_manager import get_resource_manager from .core.security import get_security_manager, SecurityLevel import threading import time import socket import base64 from pathlib import Path from typing import Dict, List, Any, Optional import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Create FastMCP server named "python-sandbox" mcp = FastMCP("python-sandbox") # Global state for execution context class ExecutionContext: def __init__(self): # Compute project_root dynamically from current file location # When installed as package, __file__ is in src/sandbox/, so go up 2 levels current_file = Path(__file__).resolve() if 'src/sandbox' in str(current_file): # Installed package: go from src/sandbox/mcp_*.py to project root self.project_root = current_file.parent.parent.parent else: # Development: assume file is in project root self.project_root = current_file.parent self.venv_path = self.project_root / ".venv" self.artifacts_dir = None self.web_servers = {} # Track running web servers self.execution_globals = {} # Persistent globals across executions self._setup_environment() def _setup_environment(self): """Setup sys.path and virtual environment with robust path detection.""" # Compute absolute paths project_root_str = str(self.project_root) project_parent_str = str(self.project_root.parent) # Add parent to find 'sandbox' package # Detect venv site-packages dynamically venv_site_packages = None if self.venv_path.exists(): # Try multiple Python versions for py_version in ['python3.11', 'python3.12', 'python3.10', 'python3.9']: candidate = self.venv_path / "lib" / py_version / "site-packages" if candidate.exists(): venv_site_packages = candidate break # De-duplicate sys.path using OrderedDict to preserve order from collections import OrderedDict current_paths = OrderedDict.fromkeys(sys.path) # Paths to add (parent first for package imports, then project root) paths_to_add = [project_parent_str, project_root_str] if venv_site_packages: paths_to_add.append(str(venv_site_packages)) # Add new paths at the beginning, preserving order and avoiding duplicates new_sys_path = [] for path in paths_to_add: if path not in current_paths: new_sys_path.append(path) current_paths[path] = None # Mark as added # Rebuild sys.path with new paths first sys.path[:] = new_sys_path + list(current_paths.keys()) # Set up virtual environment activation if self.venv_path.exists(): venv_python = self.venv_path / "bin" / "python" venv_bin = self.venv_path / "bin" if venv_python.exists(): # Set environment variables for venv activation os.environ['VIRTUAL_ENV'] = str(self.venv_path) # Prepend venv/bin to PATH if not already present current_path = os.environ.get('PATH', '') venv_bin_str = str(venv_bin) if venv_bin_str not in current_path.split(os.pathsep): os.environ['PATH'] = f"{venv_bin_str}{os.pathsep}{current_path}" # Update sys.executable to point to venv python sys.executable = str(venv_python) logger.info(f"Project root: {self.project_root}") logger.info(f"Virtual env: {self.venv_path if self.venv_path.exists() else 'Not found'}") logger.info(f"sys.executable: {sys.executable}") logger.info(f"sys.path (first 5): {sys.path[:5]}") logger.info(f"VIRTUAL_ENV: {os.environ.get('VIRTUAL_ENV', 'Not set')}") def create_artifacts_dir(self) -> str: """Create a temporary directory for execution artifacts.""" execution_id = str(uuid.uuid4())[:8] self.artifacts_dir = Path(tempfile.gettempdir()) / f"sandbox_artifacts_{execution_id}" self.artifacts_dir.mkdir(exist_ok=True) return str(self.artifacts_dir) def cleanup_artifacts(self): """Clean up artifacts directory.""" if self.artifacts_dir and self.artifacts_dir.exists(): shutil.rmtree(self.artifacts_dir, ignore_errors=True) # Global execution context ctx = ExecutionContext() resource_manager = get_resource_manager() security_manager = get_security_manager(SecurityLevel.MEDIUM) def monkey_patch_matplotlib(): """Monkey patch matplotlib to save plots to artifacts directory.""" try: import matplotlib matplotlib.use('Agg') # Use non-interactive backend import matplotlib.pyplot as plt original_show = plt.show def patched_show(*args, **kwargs): if ctx.artifacts_dir: figure_path = ctx.artifacts_dir / f"plot_{uuid.uuid4().hex[:8]}.png" plt.savefig(figure_path, dpi=150, bbox_inches='tight') logger.info(f"Plot saved to: {figure_path}") return original_show(*args, **kwargs) plt.show = patched_show return True except ImportError: return False def monkey_patch_pil(): """Monkey patch PIL to save images to artifacts directory.""" try: from PIL import Image original_show = Image.Image.show original_save = Image.Image.save def patched_show(self, title=None, command=None): if ctx.artifacts_dir: image_path = ctx.artifacts_dir / f"image_{uuid.uuid4().hex[:8]}.png" self.save(image_path) logger.info(f"Image saved to: {image_path}") return original_show(self, title, command) def patched_save(self, fp, format=None, **params): result = original_save(self, fp, format, **params) # If saving to artifacts dir, log it if ctx.artifacts_dir and str(fp).startswith(str(ctx.artifacts_dir)): logger.info(f"Image saved to artifacts: {fp}") return result Image.Image.show = patched_show Image.Image.save = patched_save return True except ImportError: return False def find_free_port(start_port=8000): """Find a free port starting from start_port.""" for port in range(start_port, start_port + 100): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('127.0.0.1', port)) return port except OSError: continue raise RuntimeError("No free ports available") def launch_web_app(code: str, app_type: str) -> Optional[str]: """Launch a web application and return the URL.""" try: resource_manager.check_resource_limits() port = find_free_port() resource_manager.process_manager.cleanup_finished() if app_type == 'flask': # Modify Flask code to run on specific port modified_code = code + f"\nif __name__ == '__main__': app.run(host='127.0.0.1', port={port}, debug=False)" elif app_type == 'streamlit': # For Streamlit, we need to create a temporary file and run it script_path = ctx.artifacts_dir / f"streamlit_app_{uuid.uuid4().hex[:8]}.py" with open(script_path, 'w') as f: f.write(code) # Launch Streamlit in subprocess cmd = [sys.executable, '-m', 'streamlit', 'run', str(script_path), '--server.port', str(port), '--server.headless', 'true'] process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Register process with resource manager process_id = resource_manager.process_manager.add_process( process, name=f"streamlit_{port}", metadata={'type': 'streamlit', 'port': port} ) # Give it time to start time.sleep(2) if process.poll() is None: # Still running url = f"http://127.0.0.1:{port}" ctx.web_servers[url] = process_id return url else: return None else: return None if app_type == 'flask': # Execute the modified Flask code in a separate thread def run_flask(): exec(modified_code, ctx.execution_globals) # Use resource manager for thread management future = resource_manager.thread_pool.submit(run_flask) time.sleep(1) # Give Flask time to start url = f"http://127.0.0.1:{port}" return url except Exception as e: logger.error(f"Failed to launch web app: {e}") return None def collect_artifacts() -> List[Dict[str, Any]]: """Collect all artifacts from the artifacts directory.""" artifacts = [] if not ctx.artifacts_dir or not ctx.artifacts_dir.exists(): return artifacts for file_path in ctx.artifacts_dir.iterdir(): if file_path.is_file(): try: # Read file as base64 for embedding with open(file_path, 'rb') as f: content = base64.b64encode(f.read()).decode('utf-8') artifacts.append({ 'name': file_path.name, 'path': str(file_path), 'type': file_path.suffix.lower(), 'content_base64': content, 'size': file_path.stat().st_size }) except Exception as e: logger.error(f"Error reading artifact {file_path}: {e}") return artifacts @mcp.tool def execute(code: str, interactive: bool = False, web_app_type: Optional[str] = None) -> str: """ Execute Python code with enhanced features: - Robust sys.path and venv activation - Module import error handling with full traceback - Artifact interception and storage - Web app launch support - Interactive REPL mode Args: code: Python code to execute interactive: If True, drop into interactive REPL after execution web_app_type: Type of web app to launch ('flask' or 'streamlit') Returns: JSON string containing execution results, artifacts, and metadata """ # Create artifacts directory for this execution artifacts_dir = ctx.create_artifacts_dir() # Set up monkey patches matplotlib_patched = monkey_patch_matplotlib() pil_patched = monkey_patch_pil() # Capture stdout and stderr old_stdout, old_stderr = sys.stdout, sys.stderr stdout_capture = io.StringIO() stderr_capture = io.StringIO() result = { 'stdout': '', 'stderr': '', 'error': None, 'artifacts': [], 'web_url': None, 'execution_info': { 'sys_executable': sys.executable, 'sys_path_first_3': sys.path[:3], 'project_root': str(ctx.project_root), 'artifacts_dir': artifacts_dir, 'matplotlib_patched': matplotlib_patched, 'pil_patched': pil_patched } } try: sys.stdout = stdout_capture sys.stderr = stderr_capture # Check if this is a web app if web_app_type in ['flask', 'streamlit']: url = launch_web_app(code, web_app_type) if url: result['web_url'] = url result['stdout'] = f"Web application launched at: {url}" else: result['stderr'] = f"Failed to launch {web_app_type} application" else: # Regular code execution exec(code, ctx.execution_globals) # Interactive REPL mode if interactive: result['stdout'] += "\n[Interactive mode enabled - code executed successfully]\n" result['stdout'] += "Note: REPL mode would be available in a real terminal session\n" except ImportError as e: # Enhanced import error handling error_trace = traceback.format_exc() module_name = str(e).split("'")[1] if "'" in str(e) else "unknown" error_details = { 'type': 'ImportError', 'message': str(e), 'module': module_name, 'traceback': error_trace, 'sys_path': sys.path[:5], # First 5 paths for debugging 'attempted_paths': [p for p in sys.path if Path(p).exists()] } result['error'] = error_details result['stderr'] = f"Import Error: {e}\n\nFull traceback:\n{error_trace}" except Exception as e: # General exception handling error_trace = traceback.format_exc() result['error'] = { 'type': type(e).__name__, 'message': str(e), 'traceback': error_trace } result['stderr'] = f"Error: {e}\n\nFull traceback:\n{error_trace}" finally: # Restore stdout/stderr sys.stdout = old_stdout sys.stderr = old_stderr # Capture output result['stdout'] += stdout_capture.getvalue() result['stderr'] += stderr_capture.getvalue() # Collect artifacts result['artifacts'] = collect_artifacts() return json.dumps(result, indent=2) @mcp.tool def list_artifacts() -> str: """List all current artifacts.""" artifacts = collect_artifacts() if not artifacts: return "No artifacts found." result = "Current artifacts:\n" for artifact in artifacts: result += f"- {artifact['name']} ({artifact['size']} bytes) - {artifact['type']}\n" return result @mcp.tool def cleanup_artifacts() -> str: """Clean up all artifacts and temporary files.""" ctx.cleanup_artifacts() # Also cleanup web servers for url, process in ctx.web_servers.items(): try: process.terminate() except: pass ctx.web_servers.clear() return "Artifacts and web servers cleaned up." @mcp.tool def start_repl() -> str: """Start an interactive REPL session (simulated for MCP).""" # In a real implementation, this would stream stdin/stdout over MCP # For now, we provide a simulation return json.dumps({ 'status': 'repl_started', 'message': 'Interactive REPL session started (simulated)', 'note': 'In a full implementation, this would provide streaming I/O over MCP', 'globals_available': list(ctx.execution_globals.keys()), 'sys_path_active': sys.path[:3] }, indent=2) @mcp.tool def start_web_app(code: str, app_type: str = 'flask') -> str: """Launch a web application and return connection details.""" url = launch_web_app(code, app_type) if url: return json.dumps({ 'status': 'success', 'url': url, 'app_type': app_type, 'message': f'{app_type.title()} application launched successfully' }, indent=2) else: return json.dumps({ 'status': 'error', 'app_type': app_type, 'message': f'Failed to launch {app_type} application' }, indent=2) @mcp.tool def cleanup_temp_artifacts(max_age_hours: int = 24) -> str: """Clean up old temporary artifact directories.""" cleaned = 0 temp_dir = Path(tempfile.gettempdir()) try: for item in temp_dir.glob('sandbox_artifacts_*'): if item.is_dir(): # Check age age_hours = (time.time() - item.stat().st_mtime) / 3600 if age_hours > max_age_hours: shutil.rmtree(item, ignore_errors=True) cleaned += 1 except Exception as e: logger.error(f"Error during temp cleanup: {e}") return json.dumps({ 'cleaned_directories': cleaned, 'max_age_hours': max_age_hours, 'message': f'Cleaned {cleaned} old artifact directories' }, indent=2) @mcp.tool def shell_execute(command: str, working_directory: Optional[str] = None, timeout: int = 30) -> str: """ Execute a shell command safely in a controlled environment. Args: command: The shell command to execute working_directory: Directory to run the command in (defaults to project root) timeout: Maximum execution time in seconds Returns: JSON string containing execution results, stdout, stderr, and metadata """ # Set working directory if working_directory is None: working_directory = str(ctx.project_root) # Enhanced security checks using security manager is_safe, violation = security_manager.check_command_security(command) if not is_safe: return json.dumps({ 'stdout': '', 'stderr': f'Command blocked for security: {violation.message}', 'return_code': -1, 'error': { 'type': 'SecurityError', 'message': violation.message, 'level': violation.level.value, 'command': command }, 'execution_info': { 'working_directory': working_directory, 'timeout': timeout, 'command_blocked': True, 'security_violation': True } }, indent=2) result = { 'stdout': '', 'stderr': '', 'return_code': 0, 'error': None, 'execution_info': { 'working_directory': working_directory, 'timeout': timeout, 'command': command, 'command_blocked': False } } try: # Execute the command with timeout process = subprocess.run( command, shell=True, cwd=working_directory, timeout=timeout, capture_output=True, text=True, env=os.environ.copy() # Use current environment including VIRTUAL_ENV ) result['stdout'] = process.stdout result['stderr'] = process.stderr result['return_code'] = process.returncode if process.returncode != 0: result['error'] = { 'type': 'CommandError', 'message': f'Command failed with return code {process.returncode}', 'return_code': process.returncode } except subprocess.TimeoutExpired: result['error'] = { 'type': 'TimeoutError', 'message': f'Command timed out after {timeout} seconds', 'timeout': timeout } result['stderr'] = f'Command timed out after {timeout} seconds' result['return_code'] = -2 except Exception as e: result['error'] = { 'type': type(e).__name__, 'message': str(e), 'traceback': traceback.format_exc() } result['stderr'] = f'Error executing command: {e}' result['return_code'] = -3 return json.dumps(result, indent=2) @mcp.tool def get_execution_info() -> str: """Get information about the current execution environment.""" info = { 'project_root': str(ctx.project_root), 'venv_path': str(ctx.venv_path), 'venv_active': ctx.venv_path.exists(), 'sys_executable': sys.executable, 'sys_path_length': len(sys.path), 'sys_path_first_5': sys.path[:5], 'artifacts_dir': str(ctx.artifacts_dir) if ctx.artifacts_dir else None, 'web_servers': list(ctx.web_servers.keys()), 'global_variables': list(ctx.execution_globals.keys()), 'virtual_env': os.environ.get('VIRTUAL_ENV'), 'path_contains_venv': str(ctx.venv_path / 'bin') in os.environ.get('PATH', ''), 'current_working_directory': os.getcwd(), 'shell_available': True } return json.dumps(info, indent=2) @mcp.tool def get_resource_stats() -> str: """Get comprehensive resource usage statistics.""" stats = resource_manager.get_resource_stats() return json.dumps(stats, indent=2) @mcp.tool def emergency_cleanup() -> str: """Perform emergency cleanup of all resources.""" try: # Clean up finished processes finished = resource_manager.process_manager.cleanup_finished() # Clean up web servers for url, process_id in ctx.web_servers.items(): resource_manager.process_manager.remove_process(process_id) ctx.web_servers.clear() # Clean up artifacts ctx.cleanup_artifacts() # Force garbage collection import gc gc.collect() return json.dumps({ 'status': 'success', 'message': 'Emergency cleanup completed', 'finished_processes': finished, 'web_servers_cleaned': len(ctx.web_servers) }, indent=2) except Exception as e: return json.dumps({ 'status': 'error', 'message': f'Emergency cleanup failed: {str(e)}' }, indent=2) def main(): """Entry point for the HTTP MCP server.""" mcp.run(transport="http", host="0.0.0.0", port=8765) if __name__ == "__main__": # Run FastMCP server over HTTP (Streamable HTTP transport) on all interfaces port 8765 main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/scooter-lacroix/sandbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server