orchestration_engine.py•18.5 kB
"""
Orchestration engine for the Sectional MCP Panel.
Handles server lifecycle management and group operations.
"""
import asyncio
import logging
import uuid
from typing import Dict, List, Any, Optional, Tuple
from sqlalchemy.ext.asyncio import AsyncSession
from src.database import crud
from src.config.config_manager import ConfigManager
from src.runtime.runtime_engine import RuntimeEngine
logger = logging.getLogger("mcp_panel.orchestration")
class OrchestrationEngine:
"""
Orchestration engine for managing server lifecycle and group operations.
Coordinates between configuration, database, and runtime components.
"""
def __init__(self, config_manager: ConfigManager, runtime_engine: RuntimeEngine):
"""Initialize the orchestration engine."""
self.config_manager = config_manager
self.runtime_engine = runtime_engine
self.task_semaphore = asyncio.Semaphore(10) # Limit concurrent tasks
async def start_server(
self,
db: AsyncSession,
server_id: int,
user: Optional[str] = None
) -> Tuple[bool, str]:
"""
Start a server.
Args:
db: Database session
server_id: ID of the server to start
user: Username for audit logging
Returns:
Tuple of (success, message)
"""
# Get server details
server = await crud.get_server_by_id(db, server_id)
if not server:
return False, f"Server with ID {server_id} not found"
# Check if server is already running
if server.status == "Running":
return True, f"Server {server.name} is already running"
# Get section and global settings
section = await crud.get_section_by_id(db, server.section_id)
if not section:
return False, f"Section for server {server.name} not found"
panel = await crud.get_panel_config(db)
if not panel:
return False, "Panel configuration not found"
# Resolve effective settings
global_settings = panel.global_settings.get("settings", {})
section_settings = section.settings
server_settings = server.settings
effective_settings = self.config_manager.resolve_server_settings(
global_settings,
section_settings,
server_settings
)
# Start the server
success, message, process_id = await self.runtime_engine.start_server(
server.name,
server.runtime_definition,
effective_settings
)
# Update server status in database
if success:
await crud.update_server_status(db, server_id, "Running", process_id)
# Add audit log
await crud.add_audit_log(
db,
action="start_server",
entity_type="server",
entity_id=str(server_id),
user=user,
details={"server_name": server.name, "process_id": process_id}
)
return True, f"Server {server.name} started successfully"
else:
await crud.update_server_status(db, server_id, "Error")
# Add audit log
await crud.add_audit_log(
db,
action="start_server_failed",
entity_type="server",
entity_id=str(server_id),
user=user,
details={"server_name": server.name, "error": message}
)
return False, f"Failed to start server {server.name}: {message}"
async def stop_server(
self,
db: AsyncSession,
server_id: int,
force: bool = False,
timeout: int = 30,
user: Optional[str] = None
) -> Tuple[bool, str]:
"""
Stop a server.
Args:
db: Database session
server_id: ID of the server to stop
force: Whether to force stop
timeout: Timeout in seconds
user: Username for audit logging
Returns:
Tuple of (success, message)
"""
# Get server details
server = await crud.get_server_by_id(db, server_id)
if not server:
return False, f"Server with ID {server_id} not found"
# Check if server is already stopped
if server.status == "Stopped":
return True, f"Server {server.name} is already stopped"
# Check if process ID is available
if not server.process_id:
await crud.update_server_status(db, server_id, "Stopped")
return True, f"Server {server.name} has no process ID, marked as stopped"
# Stop the server
success, message = await self.runtime_engine.stop_server(
server.name,
server.process_id,
force,
timeout
)
# Update server status in database
if success:
await crud.update_server_status(db, server_id, "Stopped", None)
# Add audit log
await crud.add_audit_log(
db,
action="stop_server",
entity_type="server",
entity_id=str(server_id),
user=user,
details={"server_name": server.name, "force": force}
)
return True, f"Server {server.name} stopped successfully"
else:
# Add audit log
await crud.add_audit_log(
db,
action="stop_server_failed",
entity_type="server",
entity_id=str(server_id),
user=user,
details={"server_name": server.name, "error": message}
)
return False, f"Failed to stop server {server.name}: {message}"
async def restart_server(
self,
db: AsyncSession,
server_id: int,
force: bool = False,
timeout: int = 30,
user: Optional[str] = None
) -> Tuple[bool, str]:
"""
Restart a server.
Args:
db: Database session
server_id: ID of the server to restart
force: Whether to force stop
timeout: Timeout in seconds
user: Username for audit logging
Returns:
Tuple of (success, message)
"""
# Stop the server
stop_success, stop_message = await self.stop_server(db, server_id, force, timeout, user)
if not stop_success and "already stopped" not in stop_message:
return False, f"Failed to stop server during restart: {stop_message}"
# Start the server
start_success, start_message = await self.start_server(db, server_id, user)
if not start_success:
return False, f"Failed to start server during restart: {start_message}"
# Add audit log
server = await crud.get_server_by_id(db, server_id)
await crud.add_audit_log(
db,
action="restart_server",
entity_type="server",
entity_id=str(server_id),
user=user,
details={"server_name": server.name if server else "Unknown"}
)
return True, f"Server restarted successfully"
async def start_section(
self,
db: AsyncSession,
section_id: int,
concurrency: int = 5,
user: Optional[str] = None
) -> str:
"""
Start all servers in a section.
This is an asynchronous operation that returns a task ID.
Args:
db: Database session
section_id: ID of the section
concurrency: Maximum number of concurrent server operations
user: Username for audit logging
Returns:
Task ID for tracking the operation
"""
# Create a task record
task = await crud.create_task(
db,
"start_section",
{"section_id": section_id, "concurrency": concurrency}
)
# Start the background task
asyncio.create_task(
self._execute_section_operation(
task.task_id,
section_id,
"start",
concurrency,
user=user
)
)
return task.task_id
async def stop_section(
self,
db: AsyncSession,
section_id: int,
concurrency: int = 5,
force: bool = False,
timeout: int = 30,
user: Optional[str] = None
) -> str:
"""
Stop all servers in a section.
This is an asynchronous operation that returns a task ID.
Args:
db: Database session
section_id: ID of the section
concurrency: Maximum number of concurrent server operations
force: Whether to force stop
timeout: Timeout in seconds
user: Username for audit logging
Returns:
Task ID for tracking the operation
"""
# Create a task record
task = await crud.create_task(
db,
"stop_section",
{
"section_id": section_id,
"concurrency": concurrency,
"force": force,
"timeout": timeout
}
)
# Start the background task
asyncio.create_task(
self._execute_section_operation(
task.task_id,
section_id,
"stop",
concurrency,
force=force,
timeout=timeout,
user=user
)
)
return task.task_id
async def restart_section(
self,
db: AsyncSession,
section_id: int,
concurrency: int = 5,
force: bool = False,
timeout: int = 30,
user: Optional[str] = None
) -> str:
"""
Restart all servers in a section.
This is an asynchronous operation that returns a task ID.
Args:
db: Database session
section_id: ID of the section
concurrency: Maximum number of concurrent server operations
force: Whether to force stop
timeout: Timeout in seconds
user: Username for audit logging
Returns:
Task ID for tracking the operation
"""
# Create a task record
task = await crud.create_task(
db,
"restart_section",
{
"section_id": section_id,
"concurrency": concurrency,
"force": force,
"timeout": timeout
}
)
# Start the background task
asyncio.create_task(
self._execute_section_operation(
task.task_id,
section_id,
"restart",
concurrency,
force=force,
timeout=timeout,
user=user
)
)
return task.task_id
async def _execute_section_operation(
self,
task_id: str,
section_id: int,
operation: str,
concurrency: int = 5,
force: bool = False,
timeout: int = 30,
user: Optional[str] = None
):
"""
Execute an operation on all servers in a section.
This is a background task that updates the task status as it progresses.
Args:
task_id: ID of the task
section_id: ID of the section
operation: Operation to perform ("start", "stop", or "restart")
concurrency: Maximum number of concurrent server operations
force: Whether to force stop (for stop and restart operations)
timeout: Timeout in seconds (for stop and restart operations)
user: Username for audit logging
"""
# Create a new database session for this background task
from src.database.database import get_db
# Set up concurrency control
semaphore = asyncio.Semaphore(concurrency)
async with get_db() as db:
try:
# Update task status to running
await crud.update_task_status(db, task_id, "Running")
# Get section details
section = await crud.get_section_by_id(db, section_id)
if not section:
await crud.update_task_status(
db,
task_id,
"Failed",
error=f"Section with ID {section_id} not found"
)
return
# Get all servers in the section
servers = await crud.get_servers(db, section_id)
if not servers:
await crud.update_task_status(
db,
task_id,
"Completed",
result={"message": f"No servers found in section {section.name}"}
)
return
# Add audit log
await crud.add_audit_log(
db,
action=f"{operation}_section",
entity_type="section",
entity_id=str(section_id),
user=user,
details={
"section_name": section.name,
"server_count": len(servers),
"concurrency": concurrency
}
)
# Execute the operation on each server with concurrency control
tasks = []
for server in servers:
tasks.append(
self._execute_server_operation_with_semaphore(
db,
semaphore,
server.id,
operation,
force,
timeout,
user
)
)
# Wait for all tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
success_count = 0
failure_count = 0
failures = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failure_count += 1
failures.append({
"server_id": servers[i].id,
"server_name": servers[i].name,
"error": str(result)
})
elif isinstance(result, tuple) and len(result) == 2:
success, message = result
if success:
success_count += 1
else:
failure_count += 1
failures.append({
"server_id": servers[i].id,
"server_name": servers[i].name,
"error": message
})
# Update task status to completed
status = "Completed" if failure_count == 0 else "Completed with errors"
await crud.update_task_status(
db,
task_id,
status,
result={
"section_id": section_id,
"section_name": section.name,
"operation": operation,
"total_servers": len(servers),
"success_count": success_count,
"failure_count": failure_count,
"failures": failures
}
)
except Exception as e:
logger.error(f"Error executing section operation: {str(e)}")
await crud.update_task_status(
db,
task_id,
"Failed",
error=f"Error executing section operation: {str(e)}"
)
async def _execute_server_operation_with_semaphore(
self,
db: AsyncSession,
semaphore: asyncio.Semaphore,
server_id: int,
operation: str,
force: bool = False,
timeout: int = 30,
user: Optional[str] = None
) -> Tuple[bool, str]:
"""
Execute an operation on a server with concurrency control.
Args:
db: Database session
semaphore: Semaphore for concurrency control
server_id: ID of the server
operation: Operation to perform ("start", "stop", or "restart")
force: Whether to force stop (for stop and restart operations)
timeout: Timeout in seconds (for stop and restart operations)
user: Username for audit logging
Returns:
Tuple of (success, message)
"""
async with semaphore:
if operation == "start":
return await self.start_server(db, server_id, user)
elif operation == "stop":
return await self.stop_server(db, server_id, force, timeout, user)
elif operation == "restart":
return await self.restart_server(db, server_id, force, timeout, user)
else:
return False, f"Unknown operation: {operation}"