"""
Execution utilities for the AgentExecMPC server.
This module contains all the subprocess execution logic, command builders,
and process management utilities.
"""
import asyncio
import os
import re
import signal
import subprocess
import sys
import time
from pathlib import Path
from .models import ExecutionResponse
# Global process tracking for cleanup
running_processes: dict[int, subprocess.Popen] = {}
async def run_subprocess(
cmd: list[str],
timeout: int,
cwd: Path | None = None,
input_data: str | None = None,
workspace_dir: Path = None,
) -> ExecutionResponse:
"""Run a subprocess with timeout and proper cleanup."""
start_time = time.time()
cwd = cwd or workspace_dir
try:
# Set up environment to include user's local site-packages
env = os.environ.copy()
# Get Python version dynamically
python_version = f"python{sys.version_info.major}.{sys.version_info.minor}"
user_site_packages = f"/home/agent/.local/lib/{python_version}/site-packages"
# Ensure Python can find user-installed packages
if "PYTHONPATH" in env:
env["PYTHONPATH"] = f"{user_site_packages}:{env['PYTHONPATH']}"
else:
env["PYTHONPATH"] = user_site_packages
# Also set PYTHONUSERBASE to ensure pip installs to the right location
env["PYTHONUSERBASE"] = "/home/agent/.local"
# Create subprocess with process group for easier cleanup
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE if input_data else None,
cwd=cwd,
env=env,
preexec_fn=os.setsid, # Create new process group
)
# Track the process
running_processes[process.pid] = process
try:
# Run with timeout
stdout, stderr = await asyncio.wait_for(
process.communicate(input=input_data.encode() if input_data else None),
timeout=timeout,
)
duration_ms = int((time.time() - start_time) * 1000)
return ExecutionResponse(
stdout=stdout.decode("utf-8", errors="replace"),
stderr=stderr.decode("utf-8", errors="replace"),
exit_code=process.returncode or 0,
duration_ms=duration_ms,
success=process.returncode == 0,
)
except TimeoutError:
# Kill the entire process group
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
await asyncio.sleep(1) # Give it a moment to terminate gracefully
if process.returncode is None:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except (ProcessLookupError, OSError):
pass # Process already terminated
duration_ms = int((time.time() - start_time) * 1000)
return ExecutionResponse(
stdout="",
stderr=f"Process timed out after {timeout} seconds",
exit_code=-1,
duration_ms=duration_ms,
success=False,
error=f"Execution timed out after {timeout} seconds",
)
finally:
# Clean up process tracking
running_processes.pop(process.pid, None)
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
return ExecutionResponse(
stdout="",
stderr=str(e),
exit_code=-1,
duration_ms=duration_ms,
success=False,
error=f"Execution failed: {str(e)}",
)
def get_code_executor(language: str) -> list[str]:
"""Get the appropriate executor command for a language."""
language = language.lower()
if language == "python":
return ["python3", "-u"] # -u for unbuffered output
elif language in ["node", "javascript", "js"]:
return ["node"]
elif language == "go":
# For Go, we'll use a special shell-based approach
return ["go_shell_executor"] # Special marker for shell execution
else:
raise ValueError(f"Unsupported language: {language}")
def get_package_install_command(
package_manager: str, package: str, version: str | None = None
) -> list[str]:
"""Get the appropriate package installation command."""
package_manager = package_manager.lower()
if package_manager == "pip":
# Use uv pip install in the virtual environment
cmd = ["/home/agent/.local/bin/uv", "pip", "install"]
if version:
cmd.append(f"{package}=={version}")
else:
cmd.append(package)
return cmd
elif package_manager == "npm":
cmd = ["npm", "install", "--silent"]
if version:
cmd.append(f"{package}@{version}")
else:
cmd.append(package)
return cmd
elif package_manager == "go":
if version:
cmd = ["go", "install", f"{package}@{version}"]
else:
cmd = ["go", "install", f"{package}@latest"]
return cmd
else:
raise ValueError(f"Unsupported package manager: {package_manager}")
def cleanup_processes():
"""Clean up any running processes on shutdown."""
for pid, _ in running_processes.items():
try:
os.killpg(os.getpgid(pid), signal.SIGTERM)
except (ProcessLookupError, OSError):
pass
def signal_handler(signum, frame):
"""Handle shutdown signals."""
cleanup_processes()
exit(0)
async def execute_go_code_with_shell(
code: str, timeout: int, workspace_dir: Path
) -> ExecutionResponse:
"""Execute Go code using shell with CGO_ENABLED=0 and proper module handling."""
import os
# Generate a unique filename for the Go file
go_filename = f"temp_go_{os.getpid()}_{hash(code) % 10000}.go"
# Extract import statements to detect external dependencies
# Handle both single imports and multi-line import blocks
import_pattern = r'"([^"]+)"'
imports = re.findall(import_pattern, code)
# Filter to only get the imports (remove empty strings)
all_imports = [imp for imp in imports if imp and not imp.startswith("//")]
# Detect external packages (those with domain names like github.com, golang.org, etc.)
external_imports = []
for imp in all_imports:
# Standard library packages typically don't contain dots or are single words
# External packages usually have domain-like paths (github.com, golang.org, etc.)
if "." in imp and "/" in imp:
# This looks like an external package
external_imports.append(imp)
# Build the shell script
shell_script_lines = []
# Initialize Go module if we have external imports
if external_imports:
shell_script_lines.append("go mod init temp_module")
# Add go get commands for external packages
for pkg in external_imports:
shell_script_lines.append(f"go get {pkg}")
# Create the Go file using heredoc
shell_script_lines.append(f"cat > {go_filename} << 'EOF'")
shell_script_lines.append(code)
shell_script_lines.append("EOF")
# Run the Go code with CGO_ENABLED=0
shell_script_lines.append(f"CGO_ENABLED=0 go run {go_filename}")
# Clean up files
cleanup_files = [go_filename]
if external_imports:
cleanup_files.extend(["go.mod", "go.sum"])
shell_script_lines.append(f"rm -f {' '.join(cleanup_files)}")
# Join all lines with newlines to create a proper shell script
shell_command = "\n".join(shell_script_lines)
# Execute using bash
cmd = ["bash", "-c", shell_command]
return await run_subprocess(cmd=cmd, timeout=timeout, workspace_dir=workspace_dir)