Skip to main content
Glama
orchestration_engine.py18.5 kB
""" Orchestration engine for the Sectional MCP Panel. Handles server lifecycle management and group operations. """ import asyncio import logging import uuid from typing import Dict, List, Any, Optional, Tuple from sqlalchemy.ext.asyncio import AsyncSession from src.database import crud from src.config.config_manager import ConfigManager from src.runtime.runtime_engine import RuntimeEngine logger = logging.getLogger("mcp_panel.orchestration") class OrchestrationEngine: """ Orchestration engine for managing server lifecycle and group operations. Coordinates between configuration, database, and runtime components. """ def __init__(self, config_manager: ConfigManager, runtime_engine: RuntimeEngine): """Initialize the orchestration engine.""" self.config_manager = config_manager self.runtime_engine = runtime_engine self.task_semaphore = asyncio.Semaphore(10) # Limit concurrent tasks async def start_server( self, db: AsyncSession, server_id: int, user: Optional[str] = None ) -> Tuple[bool, str]: """ Start a server. Args: db: Database session server_id: ID of the server to start user: Username for audit logging Returns: Tuple of (success, message) """ # Get server details server = await crud.get_server_by_id(db, server_id) if not server: return False, f"Server with ID {server_id} not found" # Check if server is already running if server.status == "Running": return True, f"Server {server.name} is already running" # Get section and global settings section = await crud.get_section_by_id(db, server.section_id) if not section: return False, f"Section for server {server.name} not found" panel = await crud.get_panel_config(db) if not panel: return False, "Panel configuration not found" # Resolve effective settings global_settings = panel.global_settings.get("settings", {}) section_settings = section.settings server_settings = server.settings effective_settings = self.config_manager.resolve_server_settings( global_settings, section_settings, server_settings ) # Start the server success, message, process_id = await self.runtime_engine.start_server( server.name, server.runtime_definition, effective_settings ) # Update server status in database if success: await crud.update_server_status(db, server_id, "Running", process_id) # Add audit log await crud.add_audit_log( db, action="start_server", entity_type="server", entity_id=str(server_id), user=user, details={"server_name": server.name, "process_id": process_id} ) return True, f"Server {server.name} started successfully" else: await crud.update_server_status(db, server_id, "Error") # Add audit log await crud.add_audit_log( db, action="start_server_failed", entity_type="server", entity_id=str(server_id), user=user, details={"server_name": server.name, "error": message} ) return False, f"Failed to start server {server.name}: {message}" async def stop_server( self, db: AsyncSession, server_id: int, force: bool = False, timeout: int = 30, user: Optional[str] = None ) -> Tuple[bool, str]: """ Stop a server. Args: db: Database session server_id: ID of the server to stop force: Whether to force stop timeout: Timeout in seconds user: Username for audit logging Returns: Tuple of (success, message) """ # Get server details server = await crud.get_server_by_id(db, server_id) if not server: return False, f"Server with ID {server_id} not found" # Check if server is already stopped if server.status == "Stopped": return True, f"Server {server.name} is already stopped" # Check if process ID is available if not server.process_id: await crud.update_server_status(db, server_id, "Stopped") return True, f"Server {server.name} has no process ID, marked as stopped" # Stop the server success, message = await self.runtime_engine.stop_server( server.name, server.process_id, force, timeout ) # Update server status in database if success: await crud.update_server_status(db, server_id, "Stopped", None) # Add audit log await crud.add_audit_log( db, action="stop_server", entity_type="server", entity_id=str(server_id), user=user, details={"server_name": server.name, "force": force} ) return True, f"Server {server.name} stopped successfully" else: # Add audit log await crud.add_audit_log( db, action="stop_server_failed", entity_type="server", entity_id=str(server_id), user=user, details={"server_name": server.name, "error": message} ) return False, f"Failed to stop server {server.name}: {message}" async def restart_server( self, db: AsyncSession, server_id: int, force: bool = False, timeout: int = 30, user: Optional[str] = None ) -> Tuple[bool, str]: """ Restart a server. Args: db: Database session server_id: ID of the server to restart force: Whether to force stop timeout: Timeout in seconds user: Username for audit logging Returns: Tuple of (success, message) """ # Stop the server stop_success, stop_message = await self.stop_server(db, server_id, force, timeout, user) if not stop_success and "already stopped" not in stop_message: return False, f"Failed to stop server during restart: {stop_message}" # Start the server start_success, start_message = await self.start_server(db, server_id, user) if not start_success: return False, f"Failed to start server during restart: {start_message}" # Add audit log server = await crud.get_server_by_id(db, server_id) await crud.add_audit_log( db, action="restart_server", entity_type="server", entity_id=str(server_id), user=user, details={"server_name": server.name if server else "Unknown"} ) return True, f"Server restarted successfully" async def start_section( self, db: AsyncSession, section_id: int, concurrency: int = 5, user: Optional[str] = None ) -> str: """ Start all servers in a section. This is an asynchronous operation that returns a task ID. Args: db: Database session section_id: ID of the section concurrency: Maximum number of concurrent server operations user: Username for audit logging Returns: Task ID for tracking the operation """ # Create a task record task = await crud.create_task( db, "start_section", {"section_id": section_id, "concurrency": concurrency} ) # Start the background task asyncio.create_task( self._execute_section_operation( task.task_id, section_id, "start", concurrency, user=user ) ) return task.task_id async def stop_section( self, db: AsyncSession, section_id: int, concurrency: int = 5, force: bool = False, timeout: int = 30, user: Optional[str] = None ) -> str: """ Stop all servers in a section. This is an asynchronous operation that returns a task ID. Args: db: Database session section_id: ID of the section concurrency: Maximum number of concurrent server operations force: Whether to force stop timeout: Timeout in seconds user: Username for audit logging Returns: Task ID for tracking the operation """ # Create a task record task = await crud.create_task( db, "stop_section", { "section_id": section_id, "concurrency": concurrency, "force": force, "timeout": timeout } ) # Start the background task asyncio.create_task( self._execute_section_operation( task.task_id, section_id, "stop", concurrency, force=force, timeout=timeout, user=user ) ) return task.task_id async def restart_section( self, db: AsyncSession, section_id: int, concurrency: int = 5, force: bool = False, timeout: int = 30, user: Optional[str] = None ) -> str: """ Restart all servers in a section. This is an asynchronous operation that returns a task ID. Args: db: Database session section_id: ID of the section concurrency: Maximum number of concurrent server operations force: Whether to force stop timeout: Timeout in seconds user: Username for audit logging Returns: Task ID for tracking the operation """ # Create a task record task = await crud.create_task( db, "restart_section", { "section_id": section_id, "concurrency": concurrency, "force": force, "timeout": timeout } ) # Start the background task asyncio.create_task( self._execute_section_operation( task.task_id, section_id, "restart", concurrency, force=force, timeout=timeout, user=user ) ) return task.task_id async def _execute_section_operation( self, task_id: str, section_id: int, operation: str, concurrency: int = 5, force: bool = False, timeout: int = 30, user: Optional[str] = None ): """ Execute an operation on all servers in a section. This is a background task that updates the task status as it progresses. Args: task_id: ID of the task section_id: ID of the section operation: Operation to perform ("start", "stop", or "restart") concurrency: Maximum number of concurrent server operations force: Whether to force stop (for stop and restart operations) timeout: Timeout in seconds (for stop and restart operations) user: Username for audit logging """ # Create a new database session for this background task from src.database.database import get_db # Set up concurrency control semaphore = asyncio.Semaphore(concurrency) async with get_db() as db: try: # Update task status to running await crud.update_task_status(db, task_id, "Running") # Get section details section = await crud.get_section_by_id(db, section_id) if not section: await crud.update_task_status( db, task_id, "Failed", error=f"Section with ID {section_id} not found" ) return # Get all servers in the section servers = await crud.get_servers(db, section_id) if not servers: await crud.update_task_status( db, task_id, "Completed", result={"message": f"No servers found in section {section.name}"} ) return # Add audit log await crud.add_audit_log( db, action=f"{operation}_section", entity_type="section", entity_id=str(section_id), user=user, details={ "section_name": section.name, "server_count": len(servers), "concurrency": concurrency } ) # Execute the operation on each server with concurrency control tasks = [] for server in servers: tasks.append( self._execute_server_operation_with_semaphore( db, semaphore, server.id, operation, force, timeout, user ) ) # Wait for all tasks to complete results = await asyncio.gather(*tasks, return_exceptions=True) # Process results success_count = 0 failure_count = 0 failures = [] for i, result in enumerate(results): if isinstance(result, Exception): failure_count += 1 failures.append({ "server_id": servers[i].id, "server_name": servers[i].name, "error": str(result) }) elif isinstance(result, tuple) and len(result) == 2: success, message = result if success: success_count += 1 else: failure_count += 1 failures.append({ "server_id": servers[i].id, "server_name": servers[i].name, "error": message }) # Update task status to completed status = "Completed" if failure_count == 0 else "Completed with errors" await crud.update_task_status( db, task_id, status, result={ "section_id": section_id, "section_name": section.name, "operation": operation, "total_servers": len(servers), "success_count": success_count, "failure_count": failure_count, "failures": failures } ) except Exception as e: logger.error(f"Error executing section operation: {str(e)}") await crud.update_task_status( db, task_id, "Failed", error=f"Error executing section operation: {str(e)}" ) async def _execute_server_operation_with_semaphore( self, db: AsyncSession, semaphore: asyncio.Semaphore, server_id: int, operation: str, force: bool = False, timeout: int = 30, user: Optional[str] = None ) -> Tuple[bool, str]: """ Execute an operation on a server with concurrency control. Args: db: Database session semaphore: Semaphore for concurrency control server_id: ID of the server operation: Operation to perform ("start", "stop", or "restart") force: Whether to force stop (for stop and restart operations) timeout: Timeout in seconds (for stop and restart operations) user: Username for audit logging Returns: Tuple of (success, message) """ async with semaphore: if operation == "start": return await self.start_server(db, server_id, user) elif operation == "stop": return await self.stop_server(db, server_id, force, timeout, user) elif operation == "restart": return await self.restart_server(db, server_id, force, timeout, user) else: return False, f"Unknown operation: {operation}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rblake2320/sectional-mcp-panel'

If you have feedback or need assistance with the MCP directory API, please join our Discord server