Skip to main content
Glama
execution_context.py36.5 kB
""" Enhanced execution context with persistence and performance optimizations. """ import io import os import sys import json import uuid import time import pickle import tempfile import threading from pathlib import Path from typing import Dict, Any, Optional, Set, List import logging from contextlib import contextmanager from collections import OrderedDict import sqlite3 import base64 logger = logging.getLogger(__name__) class DirectoryChangeMonitor: def __init__(self, default_working_dir: Path, home_dir: Path): self.current_dir = default_working_dir self.default_dir = default_working_dir self.home_dir = home_dir def change_directory(self, new_dir: Path): if new_dir.resolve() != self.default_dir.resolve() and not new_dir.resolve().is_relative_to(self.home_dir): logger.warning(f"Unauthorized attempt to change directory: {new_dir}") raise PermissionError(f"Cannot change to directory outside home: {new_dir}") logger.info(f"Changing directory from {self.current_dir} to {new_dir}") self.current_dir = new_dir def reset_to_default(self): logger.info(f"Resetting to default directory: {self.default_dir}") self.current_dir = self.default_dir class PersistentExecutionContext: """ Enhanced execution context with state persistence and performance optimizations. Features: - Persistent variable storage across sessions - Optimized execution with caching - Complete rendering support for AI viewing - Enhanced artifact management - Performance monitoring """ def __init__(self, session_id: Optional[str] = None): self.session_id = session_id or str(uuid.uuid4()) self.project_root = self._detect_project_root() self.venv_path = self.project_root / ".venv" self.session_dir = self.project_root / "sessions" / self.session_id self.artifacts_dir = self.session_dir / "artifacts" self.state_file = self.session_dir / "state.db" # Directory monitoring and security self.home_dir = Path.home() self.directory_monitor = DirectoryChangeMonitor( default_working_dir=self.artifacts_dir, home_dir=self.home_dir ) # Performance tracking self.execution_times = [] self.memory_usage = [] self.cache_hits = 0 self.cache_misses = 0 # Execution state self.globals_dict = {} self.imports_cache = {} self.compilation_cache = {} self._lock = threading.RLock() # Initialize directories and database self._setup_directories() self._setup_database() self._setup_environment() self._load_persistent_state() logger.info(f"Initialized persistent execution context for session {self.session_id}") def _detect_project_root(self) -> Path: """Detect project root with improved logic.""" current_file = Path(__file__).resolve() # Walk up the directory tree to find project root for parent in current_file.parents: if any((parent / marker).exists() for marker in [ 'pyproject.toml', 'setup.py', '.git', 'README.md' ]): return parent # Fallback to current directory return Path.cwd() def _setup_directories(self): """Create necessary directories with proper permissions.""" self.session_dir.mkdir(parents=True, exist_ok=True) self.artifacts_dir.mkdir(parents=True, exist_ok=True) # Create subdirectories for organized artifacts for subdir in ['plots', 'images', 'videos', 'data', 'code', 'logs', 'manim']: (self.artifacts_dir / subdir).mkdir(exist_ok=True) def _setup_database(self): """Initialize SQLite database for state persistence.""" with sqlite3.connect(self.state_file) as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS execution_state ( key TEXT PRIMARY KEY, value TEXT, type TEXT, timestamp REAL ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS execution_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, code TEXT, result TEXT, execution_time REAL, memory_usage INTEGER, artifacts TEXT, timestamp REAL ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS artifacts ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, path TEXT, type TEXT, size INTEGER, metadata TEXT, timestamp REAL ) ''') def _setup_environment(self): """Enhanced environment setup with performance optimizations.""" # Compute absolute paths project_root_str = str(self.project_root) project_parent_str = str(self.project_root.parent) # Detect virtual environment venv_site_packages = None if self.venv_path.exists(): for py_version in ['python3.12', 'python3.11', 'python3.10', 'python3.9']: candidate = self.venv_path / "lib" / py_version / "site-packages" if candidate.exists(): venv_site_packages = candidate break # Optimize sys.path with deduplication current_paths = OrderedDict.fromkeys(sys.path) paths_to_add = [project_parent_str, project_root_str] if venv_site_packages: paths_to_add.append(str(venv_site_packages)) # Add new paths at the beginning new_sys_path = [] for path in paths_to_add: if path not in current_paths: new_sys_path.append(path) current_paths[path] = None sys.path[:] = new_sys_path + list(current_paths.keys()) # Virtual environment activation if self.venv_path.exists(): venv_python = self.venv_path / "bin" / "python" venv_bin = self.venv_path / "bin" if venv_python.exists(): os.environ['VIRTUAL_ENV'] = str(self.venv_path) current_path = os.environ.get('PATH', '') venv_bin_str = str(venv_bin) if venv_bin_str not in current_path.split(os.pathsep): os.environ['PATH'] = f"{venv_bin_str}{os.pathsep}{current_path}" sys.executable = str(venv_python) logger.info(f"Environment setup complete: {self.project_root}") def _load_persistent_state(self): """Load persistent execution state from database.""" try: with sqlite3.connect(self.state_file) as conn: cursor = conn.execute( 'SELECT key, value, type FROM execution_state ORDER BY timestamp DESC' ) for key, value_str, type_str in cursor: try: if type_str == 'pickle': value = pickle.loads(base64.b64decode(value_str)) else: value = json.loads(value_str) self.globals_dict[key] = value except Exception as e: logger.warning(f"Failed to load state for {key}: {e}") except Exception as e: logger.warning(f"Failed to load persistent state: {e}") def save_persistent_state(self): """Save current execution state to database.""" with self._lock: try: with sqlite3.connect(self.state_file) as conn: # Clear existing state conn.execute('DELETE FROM execution_state') # Save current state for key, value in self.globals_dict.items(): if key.startswith('_'): # Skip internal variables continue try: # Try JSON serialization first value_str = json.dumps(value) type_str = 'json' except (TypeError, ValueError): # Fall back to pickle for complex objects try: value_str = base64.b64encode(pickle.dumps(value)).decode() type_str = 'pickle' except Exception: continue # Skip non-serializable objects conn.execute( 'INSERT OR REPLACE INTO execution_state (key, value, type, timestamp) VALUES (?, ?, ?, ?)', (key, value_str, type_str, time.time()) ) except Exception as e: logger.error(f"Failed to save persistent state: {e}") @contextmanager def capture_output(self): """Context manager for capturing stdout/stderr with performance tracking.""" old_stdout = sys.stdout old_stderr = sys.stderr stdout_capture = io.StringIO() stderr_capture = io.StringIO() start_time = time.time() try: sys.stdout = stdout_capture sys.stderr = stderr_capture yield stdout_capture, stderr_capture finally: sys.stdout = old_stdout sys.stderr = old_stderr execution_time = time.time() - start_time self.execution_times.append(execution_time) # Keep only last 1000 execution times for memory efficiency if len(self.execution_times) > 1000: self.execution_times = self.execution_times[-1000:] def execute_code(self, code: str, cache_key: Optional[str] = None, validate: bool = True) -> Dict[str, Any]: """ Execute code with enhanced performance, caching, and validation. Args: code: Python code to execute cache_key: Optional cache key for compilation caching validate: Whether to validate code before execution Returns: Dictionary containing execution results """ with self._lock: start_time = time.time() result = { 'success': False, 'error': None, 'error_type': None, 'stdout': '', 'stderr': '', 'execution_time': 0, 'artifacts': [], 'cache_hit': False, 'validation_result': None, 'formatted_code': code } # Step 1: Validate code if requested if validate: from .code_validator import CodeValidator validator = CodeValidator() validation_result = validator.validate_and_format(code) result['validation_result'] = validation_result if not validation_result['valid']: result['error'] = '; '.join(validation_result['issues']) result['error_type'] = 'ValidationError' result['execution_time'] = time.time() - start_time return result # Use formatted code if validation passed code = validation_result['formatted_code'] result['formatted_code'] = code # Step 2: Check compilation cache if cache_key and cache_key in self.compilation_cache: compiled_code = self.compilation_cache[cache_key] self.cache_hits += 1 result['cache_hit'] = True else: try: compiled_code = compile(code, '<sandbox>', 'exec') if cache_key: self.compilation_cache[cache_key] = compiled_code self.cache_misses += 1 except SyntaxError as e: result.update({ 'error': f"Syntax error at line {e.lineno}: {e.msg}", 'error_type': 'SyntaxError', 'stderr': str(e), 'execution_time': time.time() - start_time }) return result except Exception as e: result.update({ 'error': f"Compilation error: {str(e)}", 'error_type': type(e).__name__, 'stderr': str(e), 'execution_time': time.time() - start_time }) return result # Step 3: Track artifacts before execution artifacts_before = self._get_current_artifacts() # Step 4: Execute with output capture and enhanced error reporting with self.capture_output() as (stdout, stderr): try: # Print execution info print(f"🚀 Executing code (session: {self.session_id[:8]}...)") print(f"📁 Artifacts directory: {self.artifacts_dir}") print("-" * 50) exec(compiled_code, self.globals_dict) print("-" * 50) print("✅ Execution completed successfully!") result['success'] = True except KeyboardInterrupt: result.update({ 'error': "Execution interrupted by user", 'error_type': 'KeyboardInterrupt' }) print("\n⚠️ Execution interrupted!") except MemoryError: result.update({ 'error': "Memory limit exceeded", 'error_type': 'MemoryError' }) print("\n💾 Memory limit exceeded!") except Exception as e: result.update({ 'error': str(e), 'error_type': type(e).__name__ }) print(f"\n❌ Execution failed: {str(e)}") # Print enhanced traceback import traceback traceback.print_exc() # Save error details for debugging self._save_error_details(e, code, traceback.format_exc()) # Step 5: Track artifacts after execution artifacts_after = self._get_current_artifacts() new_artifacts = artifacts_after - artifacts_before execution_time = time.time() - start_time result['execution_time'] = execution_time # Step 6: Process artifacts result['artifacts'] = list(new_artifacts) if new_artifacts: print(f"📁 Generated {len(new_artifacts)} artifacts:") for artifact in sorted(new_artifacts)[:5]: # Show first 5 print(f" - {artifact}") if len(new_artifacts) > 5: print(f" ... and {len(new_artifacts) - 5} more") # Step 7: Capture output result['stdout'] = stdout.getvalue() result['stderr'] = stderr.getvalue() # Step 8: Store execution in history self._store_execution_history( code=code, success=result['success'], error=result['error'], execution_time=execution_time, artifacts=list(new_artifacts) ) # Step 9: Save state periodically if len(self.execution_times) % 10 == 0: # Every 10 executions self.save_persistent_state() return result def _get_current_artifacts(self) -> Set[str]: """Get current set of artifact files.""" artifacts = set() if self.artifacts_dir.exists(): for file_path in self.artifacts_dir.rglob('*'): if file_path.is_file(): artifacts.add(str(file_path.relative_to(self.artifacts_dir))) return artifacts def categorize_artifacts(self) -> Dict[str, List[Dict[str, Any]]]: """Categorize artifacts by type with detailed metadata.""" categories = { 'images': [], 'videos': [], 'plots': [], 'data': [], 'code': [], 'documents': [], 'audio': [], 'manim': [], 'other': [] } if not self.artifacts_dir.exists(): return categories # File type mappings type_mappings = { 'images': {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.svg', '.webp'}, 'videos': {'.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm', '.mkv'}, 'plots': {'.png', '.jpg', '.jpeg', '.pdf', '.svg'}, # When in plots directory 'data': {'.csv', '.json', '.xml', '.yaml', '.yml', '.pkl', '.pickle', '.h5', '.hdf5'}, 'code': {'.py', '.js', '.html', '.css', '.sql', '.sh', '.bat'}, 'documents': {'.pdf', '.docx', '.doc', '.txt', '.md', '.rtf'}, 'audio': {'.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a'}, 'manim': {'.mp4', '.png', '.gif'} # When in manim-related directories } for file_path in self.artifacts_dir.rglob('*'): if not file_path.is_file(): continue relative_path = file_path.relative_to(self.artifacts_dir) suffix = file_path.suffix.lower() # Get file info try: stat = file_path.stat() file_info = { 'path': str(relative_path), 'full_path': str(file_path), 'size': stat.st_size, 'created': stat.st_ctime, 'modified': stat.st_mtime, 'extension': suffix, 'name': file_path.name } except Exception as e: logger.warning(f"Failed to get file info for {file_path}: {e}") continue # Categorize based on location and extension categorized = False # Check if it's in a specific subdirectory parts = relative_path.parts if len(parts) > 1: subdir = parts[0] if subdir in categories: categories[subdir].append(file_info) categorized = True # Enhanced Manim detection - check for various Manim output patterns if not categorized: path_str = str(relative_path).lower() if any(pattern in path_str for pattern in [ 'manim', 'scene', 'media', 'videos', 'images', 'tex', 'text' ]) and any(pattern in path_str for pattern in [ 'manim_', 'scene_', 'media/', 'videos/', 'images/' ]): categories['manim'].append(file_info) categorized = True # If not categorized by directory, use extension if not categorized: for category, extensions in type_mappings.items(): if suffix in extensions: # Additional Manim detection by content and path patterns if category in ['videos', 'images'] and any(pattern in str(relative_path).lower() for pattern in [ 'manim', 'scene', 'media/', 'videos/', 'images/', 'tex/', 'text/' ]): categories['manim'].append(file_info) else: categories[category].append(file_info) categorized = True break # If still not categorized, put in 'other' if not categorized: categories['other'].append(file_info) return categories def get_artifact_report(self) -> Dict[str, Any]: """Generate comprehensive artifact report.""" categorized = self.categorize_artifacts() report = { 'total_artifacts': sum(len(files) for files in categorized.values()), 'categories': {}, 'recent_artifacts': [], 'largest_artifacts': [], 'total_size': 0 } all_artifacts = [] for category, files in categorized.items(): if files: category_size = sum(f['size'] for f in files) report['categories'][category] = { 'count': len(files), 'size': category_size, 'files': files } report['total_size'] += category_size all_artifacts.extend(files) # Sort by modification time for recent artifacts if all_artifacts: all_artifacts.sort(key=lambda x: x['modified'], reverse=True) report['recent_artifacts'] = all_artifacts[:10] # Sort by size for largest artifacts all_artifacts.sort(key=lambda x: x['size'], reverse=True) report['largest_artifacts'] = all_artifacts[:10] return report def _store_execution_history(self, code: str, success: bool, error: Optional[str], execution_time: float, artifacts: List[str]): """Store execution in history database.""" try: with sqlite3.connect(self.state_file) as conn: conn.execute(''' INSERT INTO execution_history (code, result, execution_time, memory_usage, artifacts, timestamp) VALUES (?, ?, ?, ?, ?, ?) ''', ( code, json.dumps({'success': success, 'error': error}), execution_time, 0, # Memory usage tracking can be added later json.dumps(artifacts), time.time() )) except Exception as e: logger.error(f"Failed to store execution history: {e}") def get_execution_stats(self) -> Dict[str, Any]: """Get performance statistics.""" return { 'total_executions': len(self.execution_times), 'average_execution_time': sum(self.execution_times) / len(self.execution_times) if self.execution_times else 0, 'cache_hit_ratio': self.cache_hits / (self.cache_hits + self.cache_misses) if (self.cache_hits + self.cache_misses) > 0 else 0, 'cache_hits': self.cache_hits, 'cache_misses': self.cache_misses, 'cached_compilations': len(self.compilation_cache), 'session_id': self.session_id, 'artifacts_count': len(self._get_current_artifacts()) } def get_execution_history(self, limit: int = 100) -> List[Dict[str, Any]]: """Get execution history.""" try: with sqlite3.connect(self.state_file) as conn: cursor = conn.execute(''' SELECT code, result, execution_time, artifacts, timestamp FROM execution_history ORDER BY timestamp DESC LIMIT ? ''', (limit,)) history = [] for row in cursor: code, result_str, exec_time, artifacts_str, timestamp = row try: result = json.loads(result_str) artifacts = json.loads(artifacts_str) if artifacts_str else [] except: continue history.append({ 'code': code, 'result': result, 'execution_time': exec_time, 'artifacts': artifacts, 'timestamp': timestamp }) return history except Exception as e: logger.error(f"Failed to get execution history: {e}") return [] def clear_cache(self): """Clear compilation cache.""" with self._lock: self.compilation_cache.clear() self.cache_hits = 0 self.cache_misses = 0 def cleanup_artifacts(self): """Clean up artifacts directory and all its contents.""" if self.artifacts_dir and self.artifacts_dir.exists(): import shutil shutil.rmtree(self.artifacts_dir, ignore_errors=True) logger.info(f"Cleaned up artifacts directory: {self.artifacts_dir}") def _save_error_details(self, error: Exception, code: str, traceback_str: str): """Save detailed error information for debugging.""" try: error_dir = self.artifacts_dir / "logs" error_dir.mkdir(exist_ok=True) error_file = error_dir / f"error_{int(time.time())}.log" with open(error_file, 'w') as f: f.write(f"Error occurred at: {time.ctime()}\n") f.write(f"Error type: {type(error).__name__}\n") f.write(f"Error message: {str(error)}\n") f.write(f"Session ID: {self.session_id}\n") f.write("\n" + "="*50 + "\n") f.write("Code that caused the error:\n") f.write("="*50 + "\n") f.write(code) f.write("\n" + "="*50 + "\n") f.write("Full traceback:\n") f.write("="*50 + "\n") f.write(traceback_str) logger.info(f"Error details saved to: {error_file}") except Exception as e: logger.error(f"Failed to save error details: {e}") def change_working_directory(self, path: str, temporary: bool = False) -> Dict[str, Any]: """ Change the working directory with security checks and logging. Args: path: The new directory path temporary: Whether this is a temporary change (returns to default after operation) Returns: Dictionary containing operation result and current directory info """ try: new_path = Path(path).resolve() # Security check through directory monitor self.directory_monitor.change_directory(new_path) # Change actual working directory original_cwd = os.getcwd() os.chdir(new_path) logger.info(f"Changed working directory to: {new_path}") return { 'success': True, 'current_directory': str(new_path), 'previous_directory': original_cwd, 'is_default': new_path == self.directory_monitor.default_dir, 'is_temporary': temporary } except PermissionError as e: logger.error(f"Permission denied changing directory: {e}") return { 'success': False, 'error': str(e), 'current_directory': os.getcwd() } except Exception as e: logger.error(f"Error changing directory: {e}") return { 'success': False, 'error': str(e), 'current_directory': os.getcwd() } def list_directory(self, path: Optional[str] = None, include_hidden: bool = False) -> Dict[str, Any]: """ List contents of a directory with security checks. Args: path: Directory to list (defaults to current directory) include_hidden: Whether to include hidden files Returns: Dictionary containing directory contents and metadata """ try: target_path = Path(path) if path else Path.cwd() target_path = target_path.resolve() # Security check if not target_path.is_relative_to(self.home_dir): raise PermissionError(f"Cannot access directory outside home: {target_path}") if not target_path.exists(): return { 'success': False, 'error': f"Directory does not exist: {target_path}", 'path': str(target_path) } items = [] for item in target_path.iterdir(): if not include_hidden and item.name.startswith('.'): continue try: stat = item.stat() items.append({ 'name': item.name, 'type': 'directory' if item.is_dir() else 'file', 'size': stat.st_size if item.is_file() else None, 'modified': stat.st_mtime, 'permissions': oct(stat.st_mode)[-3:] }) except Exception as e: logger.warning(f"Failed to get info for {item}: {e}") logger.info(f"Listed directory: {target_path} ({len(items)} items)") return { 'success': True, 'path': str(target_path), 'items': sorted(items, key=lambda x: (x['type'], x['name'])), 'total_items': len(items) } except PermissionError as e: logger.error(f"Permission denied listing directory: {e}") return { 'success': False, 'error': str(e), 'path': str(target_path) if 'target_path' in locals() else path } except Exception as e: logger.error(f"Error listing directory: {e}") return { 'success': False, 'error': str(e), 'path': str(target_path) if 'target_path' in locals() else path } def find_files(self, pattern: str, search_path: Optional[str] = None, max_results: int = 100) -> Dict[str, Any]: """ Find files matching a pattern with security checks. Args: pattern: Glob pattern to search for search_path: Directory to search in (defaults to current directory) max_results: Maximum number of results to return Returns: Dictionary containing search results """ try: base_path = Path(search_path) if search_path else Path.cwd() base_path = base_path.resolve() # Security check if not base_path.is_relative_to(self.home_dir): raise PermissionError(f"Cannot search directory outside home: {base_path}") matches = [] for file_path in base_path.glob(pattern): if len(matches) >= max_results: break try: stat = file_path.stat() matches.append({ 'path': str(file_path), 'relative_path': str(file_path.relative_to(base_path)), 'name': file_path.name, 'type': 'directory' if file_path.is_dir() else 'file', 'size': stat.st_size if file_path.is_file() else None, 'modified': stat.st_mtime }) except Exception as e: logger.warning(f"Failed to get info for {file_path}: {e}") logger.info(f"Found {len(matches)} files matching pattern '{pattern}' in {base_path}") return { 'success': True, 'pattern': pattern, 'search_path': str(base_path), 'matches': matches, 'total_matches': len(matches), 'truncated': len(matches) >= max_results } except PermissionError as e: logger.error(f"Permission denied searching files: {e}") return { 'success': False, 'error': str(e), 'pattern': pattern, 'search_path': str(base_path) if 'base_path' in locals() else search_path } except Exception as e: logger.error(f"Error searching files: {e}") return { 'success': False, 'error': str(e), 'pattern': pattern, 'search_path': str(base_path) if 'base_path' in locals() else search_path } def reset_to_default_directory(self) -> Dict[str, Any]: """ Reset working directory to the default sandbox area. Returns: Dictionary containing operation result """ try: self.directory_monitor.reset_to_default() os.chdir(self.directory_monitor.default_dir) logger.info(f"Reset to default directory: {self.directory_monitor.default_dir}") return { 'success': True, 'current_directory': str(self.directory_monitor.default_dir), 'message': 'Reset to default sandbox directory' } except Exception as e: logger.error(f"Error resetting directory: {e}") return { 'success': False, 'error': str(e), 'current_directory': os.getcwd() } def get_current_directory_info(self) -> Dict[str, Any]: """ Get information about the current working directory. Returns: Dictionary containing current directory information """ try: current_dir = Path.cwd() return { 'current_directory': str(current_dir), 'default_directory': str(self.directory_monitor.default_dir), 'home_directory': str(self.home_dir), 'is_default': current_dir == self.directory_monitor.default_dir, 'is_in_home': current_dir.is_relative_to(self.home_dir), 'artifacts_directory': str(self.artifacts_dir) } except Exception as e: logger.error(f"Error getting directory info: {e}") return { 'error': str(e), 'current_directory': os.getcwd() } def cleanup(self): """Clean up resources and save state.""" self.save_persistent_state() logger.info(f"Cleaned up execution context for session {self.session_id}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/scooter-lacroix/sandbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server