Skip to main content
Glama
code_executor.py16.8 kB
"""Code Executor Service Module. Provides secure code execution with timeout control, sandboxing, and comprehensive error handling. """ import ast import sys import io import contextlib import time import threading import traceback from typing import Dict, Any, Optional, List, Tuple, Union from datetime import datetime import pandas as pd import numpy as np try: from core.config import get_config from core.exceptions import ( CodeExecutionError, SecurityError, TimeoutError, ValidationError ) from core.types import ExecutionResult, ExecutionStatus CORE_AVAILABLE = True except ImportError: CORE_AVAILABLE = False # Fallback types class ExecutionResult: def __init__(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) class ExecutionStatus: SUCCESS = "success" ERROR = "error" TIMEOUT = "timeout" SECURITY_VIOLATION = "security_violation" class CodeExecutor: """Secure code executor with advanced features.""" def __init__(self): """Initialize code executor.""" self.config = get_config() if CORE_AVAILABLE else None self.execution_history = [] self._setup_safe_environment() def _setup_safe_environment(self): """Setup safe execution environment.""" # Define allowed modules and functions self.allowed_modules = { 'pandas', 'numpy', 'math', 'datetime', 'json', 're', 'collections', 'itertools', 'functools', 'operator' } # Define forbidden functions and attributes self.forbidden_names = { '__import__', 'eval', 'exec', 'compile', 'open', 'file', 'input', 'raw_input', 'reload', 'vars', 'dir', 'globals', 'locals', 'hasattr', 'getattr', 'setattr', 'delattr', 'isinstance', 'issubclass', 'callable', '__builtins__' } # Define safe built-ins self.safe_builtins = { 'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict', 'enumerate', 'filter', 'float', 'format', 'frozenset', 'hex', 'int', 'len', 'list', 'map', 'max', 'min', 'oct', 'ord', 'pow', 'range', 'reversed', 'round', 'set', 'slice', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip', 'print' } def validate_code(self, code: str) -> Tuple[bool, List[str]]: """Validate code for security issues. Args: code: Python code to validate Returns: Tuple of (is_valid, list_of_issues) """ issues = [] try: # Parse the code into AST tree = ast.parse(code) # Check for forbidden constructs for node in ast.walk(tree): # Check for imports if isinstance(node, (ast.Import, ast.ImportFrom)): if isinstance(node, ast.Import): for alias in node.names: if alias.name.split('.')[0] not in self.allowed_modules: issues.append(f"不允许导入模块: {alias.name}") elif isinstance(node, ast.ImportFrom): if node.module and node.module.split('.')[0] not in self.allowed_modules: issues.append(f"不允许从模块导入: {node.module}") # Check for function calls elif isinstance(node, ast.Call): if isinstance(node.func, ast.Name): if node.func.id in self.forbidden_names: issues.append(f"不允许调用函数: {node.func.id}") # Check for attribute access elif isinstance(node, ast.Attribute): if node.attr.startswith('_'): issues.append(f"不允许访问私有属性: {node.attr}") # Check for name access elif isinstance(node, ast.Name): if node.id in self.forbidden_names: issues.append(f"不允许使用名称: {node.id}") except SyntaxError as e: issues.append(f"语法错误: {e}") except Exception as e: issues.append(f"代码分析错误: {e}") return len(issues) == 0, issues def execute_code(self, code: str, context: Optional[Dict[str, Any]] = None, timeout: Optional[int] = None) -> ExecutionResult: """Execute Python code safely. Args: code: Python code to execute context: Execution context with variables timeout: Execution timeout in seconds Returns: ExecutionResult with execution details """ start_time = datetime.now() # Set timeout if timeout is None: timeout = self.config.performance.execution_timeout if self.config else 30 # Validate code first is_valid, issues = self.validate_code(code) if not is_valid: error_msg = "代码安全检查失败:\n" + "\n".join(issues) if CORE_AVAILABLE: raise SecurityError(operation="代码安全检查", reason=error_msg) else: raise Exception(error_msg) # Prepare execution context exec_context = self._prepare_context(context) # Execute with timeout try: result = self._execute_with_timeout(code, exec_context, timeout) execution_result = ExecutionResult( status=ExecutionStatus.SUCCESS, result=result['result'], output=result['output'], error=None, execution_time=(datetime.now() - start_time).total_seconds(), context_variables=self._extract_variables(result['namespace']), code=code ) except TimeoutError as e: execution_result = ExecutionResult( status=ExecutionStatus.TIMEOUT, result=None, output="", error=str(e), execution_time=timeout, context_variables={}, code=code ) except Exception as e: error_details = traceback.format_exc() if CORE_AVAILABLE: # 确保将详细错误信息作为 error_details 传递 raise CodeExecutionError(code=code, error_details=error_details) from e else: execution_result = ExecutionResult( status=ExecutionStatus.ERROR, result=None, output="", error=error_details, execution_time=(datetime.now() - start_time).total_seconds(), context_variables={}, code=code ) # Store in history self.execution_history.append(execution_result) return execution_result def _prepare_context(self, context: Optional[Dict[str, Any]]) -> Dict[str, Any]: """Prepare safe execution context.""" # Start with safe built-ins safe_context = { '__builtins__': {name: getattr(__builtins__, name) for name in self.safe_builtins if hasattr(__builtins__, name)} } # Add common libraries safe_context.update({ 'pd': pd, 'pandas': pd, 'np': np, 'numpy': np }) # Add user context if provided if context: for key, value in context.items(): if not key.startswith('_') and key not in self.forbidden_names: safe_context[key] = value return safe_context def _execute_with_timeout(self, code: str, context: Dict[str, Any], timeout: int) -> Dict[str, Any]: """Execute code with timeout control.""" result = {'result': None, 'output': '', 'namespace': context.copy()} exception_holder = [] def target(): try: # Capture stdout old_stdout = sys.stdout sys.stdout = captured_output = io.StringIO() try: # Execute the code exec_result = None # Try to evaluate as expression first try: exec_result = eval(code, result['namespace']) except SyntaxError: # If not an expression, execute as statement exec(code, result['namespace']) result['result'] = exec_result result['output'] = captured_output.getvalue() finally: sys.stdout = old_stdout except Exception as e: exception_holder.append(e) # Start execution thread thread = threading.Thread(target=target) thread.daemon = True thread.start() thread.join(timeout) if thread.is_alive(): # Timeout occurred if CORE_AVAILABLE: raise TimeoutError(f"代码执行超时 ({timeout}秒)") else: raise Exception(f"代码执行超时 ({timeout}秒)") if exception_holder: raise exception_holder[0] return result def _extract_variables(self, namespace: Dict[str, Any]) -> Dict[str, Any]: """Extract user-defined variables from namespace.""" variables = {} for name, value in namespace.items(): # Skip built-ins and modules if (not name.startswith('_') and name not in self.safe_builtins and name not in ['pd', 'pandas', 'np', 'numpy'] and not callable(value) or isinstance(value, (pd.DataFrame, pd.Series))): # Convert to serializable format try: if isinstance(value, (pd.DataFrame, pd.Series)): variables[name] = { 'type': type(value).__name__, 'shape': getattr(value, 'shape', None), 'dtype': str(getattr(value, 'dtype', None)), 'preview': str(value.head() if hasattr(value, 'head') else value) } elif isinstance(value, (list, dict, str, int, float, bool)): variables[name] = value else: variables[name] = str(value) except: variables[name] = f"<{type(value).__name__} object>" return variables def analyze_code_dependencies(self, code: str) -> Dict[str, Any]: """Analyze code dependencies and complexity. Args: code: Python code to analyze Returns: Analysis results """ try: tree = ast.parse(code) analysis = { 'imports': [], 'functions_called': [], 'variables_assigned': [], 'complexity_score': 0, 'line_count': len(code.split('\n')), 'has_loops': False, 'has_conditions': False } for node in ast.walk(tree): # Track imports if isinstance(node, ast.Import): for alias in node.names: analysis['imports'].append(alias.name) elif isinstance(node, ast.ImportFrom): if node.module: analysis['imports'].append(node.module) # Track function calls elif isinstance(node, ast.Call): if isinstance(node.func, ast.Name): analysis['functions_called'].append(node.func.id) elif isinstance(node.func, ast.Attribute): analysis['functions_called'].append(node.func.attr) # Track variable assignments elif isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name): analysis['variables_assigned'].append(target.id) # Track control structures elif isinstance(node, (ast.For, ast.While)): analysis['has_loops'] = True analysis['complexity_score'] += 2 elif isinstance(node, (ast.If, ast.IfExp)): analysis['has_conditions'] = True analysis['complexity_score'] += 1 # Remove duplicates analysis['imports'] = list(set(analysis['imports'])) analysis['functions_called'] = list(set(analysis['functions_called'])) analysis['variables_assigned'] = list(set(analysis['variables_assigned'])) return analysis except Exception as e: return {'error': f"代码分析失败: {e}"} def get_execution_history(self, limit: int = 10) -> List[ExecutionResult]: """Get recent execution history. Args: limit: Maximum number of results to return Returns: List of recent execution results """ return self.execution_history[-limit:] def clear_history(self): """Clear execution history.""" self.execution_history.clear() def get_suggestions_for_error(self, error: str, code: str) -> List[str]: """Get suggestions for fixing code errors. Args: error: Error message code: Original code Returns: List of suggestions """ suggestions = [] error_lower = error.lower() # Common error patterns and suggestions if 'nameError' in error or 'not defined' in error_lower: suggestions.append("检查变量名是否正确拼写") suggestions.append("确保变量在使用前已定义") suggestions.append("检查是否需要导入相关模块") elif 'syntaxError' in error or 'invalid syntax' in error_lower: suggestions.append("检查代码语法,特别是括号、引号的匹配") suggestions.append("确保缩进正确") suggestions.append("检查是否有多余的逗号或分号") elif 'keyError' in error: suggestions.append("检查字典键名是否存在") suggestions.append("使用 .get() 方法安全访问字典") suggestions.append("检查数据框列名是否正确") elif 'indexError' in error: suggestions.append("检查列表或数组索引是否超出范围") suggestions.append("使用 len() 检查序列长度") elif 'typeError' in error: suggestions.append("检查数据类型是否匹配") suggestions.append("确保函数参数类型正确") suggestions.append("检查是否需要类型转换") elif 'attributeError' in error: suggestions.append("检查对象是否有该属性或方法") suggestions.append("确保对象类型正确") elif 'fileNotFoundError' in error or 'no such file' in error_lower: suggestions.append("检查文件路径是否正确") suggestions.append("确保文件存在") suggestions.append("使用绝对路径或检查工作目录") # Add general suggestions if not suggestions: suggestions.extend([ "检查代码逻辑是否正确", "尝试分步执行代码以定位问题", "查看完整的错误堆栈信息" ]) return suggestions

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lillard01/chatExcel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server