Skip to main content
Glama
test_orchestration.py7.53 kB
""" Test script for the Sectional MCP Panel orchestration engine. """ import asyncio import sys import os import json from pathlib import Path # Add the project root to the Python path project_root = Path(__file__).parent.parent sys.path.append(str(project_root)) from src.database.database import init_db, get_db from src.database import crud from src.config.config_manager import ConfigManager from src.runtime.runtime_engine import RuntimeEngine from src.orchestration.orchestration_engine import OrchestrationEngine async def test_orchestration_engine(): """Test the orchestration engine functionality.""" print("Testing orchestration engine...") # Initialize the database await init_db() print("Database initialized") # Initialize components config_manager = ConfigManager() runtime_engine = RuntimeEngine() orchestration_engine = OrchestrationEngine(config_manager, runtime_engine) print("Orchestration engine initialized") # Create test data async with get_db() as db: # Create panel config panel_config = { "name": "Orchestration Test Panel", "version": "0.1.0", "config_schema_version": "1.0.0", "global_settings": { "settings": { "environmentVars": { "GLOBAL_ENV": "orchestration_test" }, "resourceLimits": { "cpuRequest": 0.1, "cpuLimit": 0.5, "memoryRequestMB": 64, "memoryLimitMB": 128 } } } } panel = await crud.update_panel_config(db, panel_config) print(f"Created panel configuration: {panel.name}") # Create section section_data = { "name": "Orchestration Test Section", "description": "A test section for orchestration", "settings": { "environmentVars": { "SECTION_ENV": "orchestration_test" } } } section = await crud.create_section(db, section_data) print(f"Created section: {section.name} (ID: {section.id})") # Create server server_data = { "name": "Orchestration Test Server", "section_id": section.id, "description": "A test server for orchestration", "runtime_definition": { "type": "docker_image", "command": "nginx:latest", "args": [], "ports": [ { "containerPort": 80, "protocol": "TCP" } ] }, "settings": { "environmentVars": { "SERVER_ENV": "orchestration_test" } } } server = await crud.create_server(db, server_data) print(f"Created server: {server.name} (ID: {server.id})") # Test server start if runtime_engine.docker_client: print("\nTesting server start:") success, message = await orchestration_engine.start_server(db, server.id, "test_user") print(f"Start result: {success}, Message: {message}") # Get updated server status updated_server = await crud.get_server_by_id(db, server.id) print(f"Server status after start: {updated_server.status}") # Test server stop if updated_server.status == "Running": print("\nTesting server stop:") success, message = await orchestration_engine.stop_server(db, server.id, False, 10, "test_user") print(f"Stop result: {success}, Message: {message}") # Get updated server status updated_server = await crud.get_server_by_id(db, server.id) print(f"Server status after stop: {updated_server.status}") # Test section operations print("\nTesting section operations:") print("Creating a second test server in the section...") server_data2 = { "name": "Orchestration Test Server 2", "section_id": section.id, "description": "A second test server for orchestration", "runtime_definition": { "type": "docker_image", "command": "nginx:latest", "args": [], "ports": [ { "containerPort": 8080, "protocol": "TCP" } ] } } server2 = await crud.create_server(db, server_data2) print(f"Created second server: {server2.name} (ID: {server2.id})") # Test section start print("\nTesting section start:") task_id = await orchestration_engine.start_section(db, section.id, 2, "test_user") print(f"Section start task ID: {task_id}") # Wait a moment for the task to process print("Waiting for task to process...") await asyncio.sleep(5) # Check task status task = await crud.get_task(db, task_id) print(f"Task status: {task.status}") if task.result: print(f"Task result: {task.result}") # Get updated server statuses updated_server = await crud.get_server_by_id(db, server.id) updated_server2 = await crud.get_server_by_id(db, server2.id) print(f"Server 1 status: {updated_server.status}") print(f"Server 2 status: {updated_server2.status}") # Test section stop print("\nTesting section stop:") task_id = await orchestration_engine.stop_section(db, section.id, 2, False, 10, "test_user") print(f"Section stop task ID: {task_id}") # Wait a moment for the task to process print("Waiting for task to process...") await asyncio.sleep(5) # Check task status task = await crud.get_task(db, task_id) print(f"Task status: {task.status}") if task.result: print(f"Task result: {task.result}") # Get updated server statuses updated_server = await crud.get_server_by_id(db, server.id) updated_server2 = await crud.get_server_by_id(db, server2.id) print(f"Server 1 status: {updated_server.status}") print(f"Server 2 status: {updated_server2.status}") else: print("Docker is not available. Skipping orchestration tests that require Docker.") # Clean up print("\nCleaning up test data...") await crud.delete_server(db, server.id) if 'server2' in locals(): await crud.delete_server(db, server2.id) await crud.delete_section(db, section.id) print("Test data cleaned up") print("\nOrchestration engine test completed") if __name__ == "__main__": asyncio.run(test_orchestration_engine())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rblake2320/sectional-mcp-panel'

If you have feedback or need assistance with the MCP directory API, please join our Discord server