"""Python code execution engine with output capture."""
import code
import sys
import io
import contextlib
import time
import traceback
import ast
import inspect
from typing import Any, Dict, Optional
import logging
from .models import ExecutionResult, InspectionResult
logger = logging.getLogger(__name__)
class CodeExecutor:
"""Executes Python code in an interactive interpreter."""
def __init__(self):
self.interpreter = code.InteractiveInterpreter()
self.globals: Dict[str, Any] = {}
self.locals: Dict[str, Any] = {}
# Initialize with builtins
self.globals.update(__builtins__)
async def execute(self, code_str: str) -> ExecutionResult:
"""Execute Python code and capture output."""
start_time = time.time()
# Capture stdout/stderr
stdout_capture = io.StringIO()
stderr_capture = io.StringIO()
success = True
return_value = None
error_msg = ""
with contextlib.redirect_stdout(stdout_capture), \
contextlib.redirect_stderr(stderr_capture):
try:
# Try to parse as expression first
try:
parsed = ast.parse(code_str.strip(), mode='eval')
# If it parses as expression, evaluate it
result = eval(compile(parsed, '<input>', 'eval'), self.globals, self.locals)
if result is not None:
return_value = repr(result)
print(return_value)
except SyntaxError:
# Not an expression, try as statement(s)
try:
parsed = ast.parse(code_str, mode='exec')
exec(compile(parsed, '<input>', 'exec'), self.globals, self.locals)
except SyntaxError as e:
success = False
error_msg = f"SyntaxError: {e.msg} (line {e.lineno})"
print(error_msg, file=sys.stderr)
except Exception as e:
success = False
error_msg = f"{type(e).__name__}: {e}"
traceback.print_exc()
except Exception as e:
success = False
error_msg = f"{type(e).__name__}: {e}"
traceback.print_exc()
execution_time = time.time() - start_time
return ExecutionResult(
output=stdout_capture.getvalue(),
error=stderr_capture.getvalue(),
return_value=return_value,
execution_time=execution_time,
success=success
)
async def inspect_object(self, object_name: str) -> InspectionResult:
"""Inspect a Python object."""
try:
# Get the object from globals or locals
obj = None
if object_name in self.locals:
obj = self.locals[object_name]
elif object_name in self.globals:
obj = self.globals[object_name]
else:
# Try to evaluate as expression
try:
obj = eval(object_name, self.globals, self.locals)
except:
raise ValueError(f"Object '{object_name}' not found")
# Get basic info
obj_type = type(obj).__name__
obj_value = None
# Try to get string representation
try:
obj_value = str(obj)
if len(obj_value) > 500: # Truncate long values
obj_value = obj_value[:500] + "..."
except:
obj_value = f"<{obj_type} object>"
# Get docstring
docstring = None
try:
docstring = inspect.getdoc(obj)
except:
pass
# Get attributes and methods
attributes = []
methods = []
try:
for attr_name in dir(obj):
if not attr_name.startswith('_'): # Skip private attributes
try:
attr = getattr(obj, attr_name)
if callable(attr):
methods.append(attr_name)
else:
attributes.append(attr_name)
except:
# Some attributes might not be accessible
pass
except:
pass
# Try to get source code
source = None
try:
source = inspect.getsource(obj)
except:
pass
return InspectionResult(
name=object_name,
type=obj_type,
docstring=docstring,
attributes=attributes,
methods=methods,
value=obj_value,
source=source
)
except Exception as e:
logger.error(f"Error inspecting object {object_name}: {e}")
raise ValueError(f"Failed to inspect object '{object_name}': {e}")
def get_globals(self) -> Dict[str, Any]:
"""Get current global variables."""
# Filter out builtins for readability
return {k: v for k, v in self.globals.items()
if not k.startswith('__') and k not in __builtins__}
def get_locals(self) -> Dict[str, Any]:
"""Get current local variables."""
return self.locals.copy()
def clear_variables(self):
"""Clear all variables except builtins."""
self.globals.clear()
self.locals.clear()
self.globals.update(__builtins__)
logger.info("Cleared all variables")
def set_variable(self, name: str, value: Any):
"""Set a variable in the global scope."""
self.globals[name] = value
logger.debug(f"Set variable {name} = {repr(value)}")
def get_variable(self, name: str) -> Any:
"""Get a variable value."""
if name in self.locals:
return self.locals[name]
elif name in self.globals:
return self.globals[name]
else:
raise KeyError(f"Variable '{name}' not found")