runtime_engine.py•13.8 kB
"""
Runtime engine for the Sectional MCP Panel.
Handles container management and process execution.
"""
import os
import logging
import docker
import asyncio
import json
from typing import Dict, List, Any, Optional, Tuple
from docker.errors import DockerException, ImageNotFound, APIError
logger = logging.getLogger("mcp_panel.runtime")
class RuntimeEngine:
"""
Runtime engine for managing server execution.
Primarily uses Docker containers, with fallback to subprocess for non-containerized execution.
"""
def __init__(self):
"""Initialize the runtime engine."""
self.docker_client = None
try:
self.docker_client = docker.from_env()
logger.info("Docker client initialized successfully")
except DockerException as e:
logger.warning(f"Failed to initialize Docker client: {e}")
logger.warning("Container-based execution will not be available")
async def start_server(
self,
server_name: str,
runtime_definition: Dict[str, Any],
effective_settings: Dict[str, Any]
) -> Tuple[bool, str, Optional[str]]:
"""
Start a server based on its runtime definition and settings.
Args:
server_name: Name of the server
runtime_definition: Runtime definition from the server configuration
effective_settings: Resolved settings (global + section + server)
Returns:
Tuple of (success, status_message, process_id)
"""
runtime_type = runtime_definition.get("type", "")
if runtime_type == "docker_image" and self.docker_client:
return await self._start_docker_container(server_name, runtime_definition, effective_settings)
elif runtime_type == "npx_command" and self.docker_client:
return await self._start_npx_container(server_name, runtime_definition, effective_settings)
else:
# Fallback to subprocess for non-containerized execution
return await self._start_process(server_name, runtime_definition, effective_settings)
async def stop_server(self, server_name: str, process_id: str, force: bool = False, timeout: int = 30) -> Tuple[bool, str]:
"""
Stop a running server.
Args:
server_name: Name of the server
process_id: Process ID or container ID
force: Whether to force stop
timeout: Timeout in seconds
Returns:
Tuple of (success, status_message)
"""
if self.docker_client and self._is_container_id(process_id):
return await self._stop_container(server_name, process_id, force, timeout)
else:
return await self._stop_process(server_name, process_id, force)
async def get_server_status(self, process_id: Optional[str]) -> str:
"""
Get the status of a server.
Args:
process_id: Process ID or container ID
Returns:
Status string: "Running", "Stopped", "Error", or "Unknown"
"""
if not process_id:
return "Stopped"
if self.docker_client and self._is_container_id(process_id):
return await self._get_container_status(process_id)
else:
return await self._get_process_status(process_id)
async def _start_docker_container(
self,
server_name: str,
runtime_definition: Dict[str, Any],
effective_settings: Dict[str, Any]
) -> Tuple[bool, str, Optional[str]]:
"""Start a server using a Docker image."""
if not self.docker_client:
return False, "Docker client not available", None
try:
image = runtime_definition.get("command", "")
if not image:
return False, "No Docker image specified", None
# Prepare environment variables
env_vars = effective_settings.get("environmentVars", {})
# Prepare resource limits
resource_limits = effective_settings.get("resourceLimits", {})
cpu_limit = resource_limits.get("cpuLimit")
memory_limit = resource_limits.get("memoryLimitMB")
# Prepare runtime options
runtime_options = effective_settings.get("runtimeOptions", {})
restart_policy = runtime_options.get("restartPolicy", "no")
# Map restart policy to Docker restart policy
docker_restart_policy = {
"no": {"Name": "no"},
"on_failure": {"Name": "on-failure", "MaximumRetryCount": runtime_options.get("maxRestarts", 3)},
"always": {"Name": "always"}
}.get(restart_policy, {"Name": "no"})
# Prepare ports
ports = {}
for port_config in runtime_definition.get("ports", []):
container_port = port_config.get("containerPort")
protocol = port_config.get("protocol", "tcp").lower()
if container_port:
ports[f"{container_port}/{protocol}"] = None # Let Docker assign a host port
# Prepare volumes if working directory is specified
volumes = {}
working_dir = runtime_definition.get("workingDirectory")
# Create and start the container
container = self.docker_client.containers.run(
image=image,
name=f"mcp-{server_name}",
detach=True,
environment=env_vars,
cpu_quota=int(cpu_limit * 100000) if cpu_limit else None,
mem_limit=f"{memory_limit}m" if memory_limit else None,
restart_policy=docker_restart_policy,
ports=ports,
working_dir=working_dir,
command=runtime_definition.get("args", [])
)
logger.info(f"Started Docker container for server {server_name}: {container.id}")
return True, "Container started successfully", container.id
except ImageNotFound:
error_msg = f"Docker image not found: {image}"
logger.error(error_msg)
return False, error_msg, None
except APIError as e:
error_msg = f"Docker API error: {str(e)}"
logger.error(error_msg)
return False, error_msg, None
except Exception as e:
error_msg = f"Failed to start Docker container: {str(e)}"
logger.error(error_msg)
return False, error_msg, None
async def _start_npx_container(
self,
server_name: str,
runtime_definition: Dict[str, Any],
effective_settings: Dict[str, Any]
) -> Tuple[bool, str, Optional[str]]:
"""Start an NPX command in a Node.js container."""
if not self.docker_client:
return False, "Docker client not available", None
try:
# Use a Node.js base image
image = "node:latest"
# Get the NPX command and arguments
npx_command = runtime_definition.get("command", "npx")
args = runtime_definition.get("args", [])
# Combine into a full command
command = [npx_command] + args
# Prepare environment variables
env_vars = effective_settings.get("environmentVars", {})
# Prepare resource limits
resource_limits = effective_settings.get("resourceLimits", {})
cpu_limit = resource_limits.get("cpuLimit")
memory_limit = resource_limits.get("memoryLimitMB")
# Prepare runtime options
runtime_options = effective_settings.get("runtimeOptions", {})
restart_policy = runtime_options.get("restartPolicy", "no")
# Map restart policy to Docker restart policy
docker_restart_policy = {
"no": {"Name": "no"},
"on_failure": {"Name": "on-failure", "MaximumRetryCount": runtime_options.get("maxRestarts", 3)},
"always": {"Name": "always"}
}.get(restart_policy, {"Name": "no"})
# Prepare ports
ports = {}
for port_config in runtime_definition.get("ports", []):
container_port = port_config.get("containerPort")
protocol = port_config.get("protocol", "tcp").lower()
if container_port:
ports[f"{container_port}/{protocol}"] = None # Let Docker assign a host port
# Create and start the container
container = self.docker_client.containers.run(
image=image,
name=f"mcp-{server_name}",
detach=True,
environment=env_vars,
cpu_quota=int(cpu_limit * 100000) if cpu_limit else None,
mem_limit=f"{memory_limit}m" if memory_limit else None,
restart_policy=docker_restart_policy,
ports=ports,
working_dir=runtime_definition.get("workingDirectory", "/app"),
command=command
)
logger.info(f"Started NPX container for server {server_name}: {container.id}")
return True, "Container started successfully", container.id
except ImageNotFound:
error_msg = f"Docker image not found: {image}"
logger.error(error_msg)
return False, error_msg, None
except APIError as e:
error_msg = f"Docker API error: {str(e)}"
logger.error(error_msg)
return False, error_msg, None
except Exception as e:
error_msg = f"Failed to start NPX container: {str(e)}"
logger.error(error_msg)
return False, error_msg, None
async def _start_process(
self,
server_name: str,
runtime_definition: Dict[str, Any],
effective_settings: Dict[str, Any]
) -> Tuple[bool, str, Optional[str]]:
"""
Start a server using subprocess (fallback method).
Note: This is a placeholder for subprocess implementation.
In a real implementation, this would use asyncio.create_subprocess_exec.
"""
logger.warning(f"Subprocess execution not fully implemented. Server {server_name} cannot be started.")
return False, "Subprocess execution not implemented", None
async def _stop_container(
self,
server_name: str,
container_id: str,
force: bool = False,
timeout: int = 30
) -> Tuple[bool, str]:
"""Stop a Docker container."""
if not self.docker_client:
return False, "Docker client not available"
try:
container = self.docker_client.containers.get(container_id)
if force:
container.kill()
logger.info(f"Forcefully stopped container for server {server_name}: {container_id}")
return True, "Container forcefully stopped"
else:
container.stop(timeout=timeout)
logger.info(f"Gracefully stopped container for server {server_name}: {container_id}")
return True, "Container gracefully stopped"
except docker.errors.NotFound:
logger.warning(f"Container not found for server {server_name}: {container_id}")
return True, "Container not found (already stopped)"
except Exception as e:
error_msg = f"Failed to stop container: {str(e)}"
logger.error(error_msg)
return False, error_msg
async def _stop_process(self, server_name: str, process_id: str, force: bool = False) -> Tuple[bool, str]:
"""
Stop a process (fallback method).
Note: This is a placeholder for process termination implementation.
"""
logger.warning(f"Process termination not fully implemented. Server {server_name} cannot be stopped.")
return False, "Process termination not implemented"
async def _get_container_status(self, container_id: str) -> str:
"""Get the status of a Docker container."""
if not self.docker_client:
return "Unknown"
try:
container = self.docker_client.containers.get(container_id)
status = container.status
# Map Docker status to our status
if status == "running":
return "Running"
elif status in ["exited", "dead", "created", "paused", "restarting"]:
return "Stopped"
else:
return "Unknown"
except docker.errors.NotFound:
return "Stopped"
except Exception as e:
logger.error(f"Failed to get container status: {str(e)}")
return "Error"
async def _get_process_status(self, process_id: str) -> str:
"""
Get the status of a process (fallback method).
Note: This is a placeholder for process status checking implementation.
"""
return "Unknown"
def _is_container_id(self, process_id: str) -> bool:
"""Check if a process ID is likely a Docker container ID."""
# Docker container IDs are 64-character hex strings, but often abbreviated to 12 chars
return len(process_id) >= 12 and all(c in "0123456789abcdef" for c in process_id.lower())