test_orchestration.py•7.53 kB
"""
Test script for the Sectional MCP Panel orchestration engine.
"""
import asyncio
import sys
import os
import json
from pathlib import Path
# Add the project root to the Python path
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))
from src.database.database import init_db, get_db
from src.database import crud
from src.config.config_manager import ConfigManager
from src.runtime.runtime_engine import RuntimeEngine
from src.orchestration.orchestration_engine import OrchestrationEngine
async def test_orchestration_engine():
"""Test the orchestration engine functionality."""
print("Testing orchestration engine...")
# Initialize the database
await init_db()
print("Database initialized")
# Initialize components
config_manager = ConfigManager()
runtime_engine = RuntimeEngine()
orchestration_engine = OrchestrationEngine(config_manager, runtime_engine)
print("Orchestration engine initialized")
# Create test data
async with get_db() as db:
# Create panel config
panel_config = {
"name": "Orchestration Test Panel",
"version": "0.1.0",
"config_schema_version": "1.0.0",
"global_settings": {
"settings": {
"environmentVars": {
"GLOBAL_ENV": "orchestration_test"
},
"resourceLimits": {
"cpuRequest": 0.1,
"cpuLimit": 0.5,
"memoryRequestMB": 64,
"memoryLimitMB": 128
}
}
}
}
panel = await crud.update_panel_config(db, panel_config)
print(f"Created panel configuration: {panel.name}")
# Create section
section_data = {
"name": "Orchestration Test Section",
"description": "A test section for orchestration",
"settings": {
"environmentVars": {
"SECTION_ENV": "orchestration_test"
}
}
}
section = await crud.create_section(db, section_data)
print(f"Created section: {section.name} (ID: {section.id})")
# Create server
server_data = {
"name": "Orchestration Test Server",
"section_id": section.id,
"description": "A test server for orchestration",
"runtime_definition": {
"type": "docker_image",
"command": "nginx:latest",
"args": [],
"ports": [
{
"containerPort": 80,
"protocol": "TCP"
}
]
},
"settings": {
"environmentVars": {
"SERVER_ENV": "orchestration_test"
}
}
}
server = await crud.create_server(db, server_data)
print(f"Created server: {server.name} (ID: {server.id})")
# Test server start
if runtime_engine.docker_client:
print("\nTesting server start:")
success, message = await orchestration_engine.start_server(db, server.id, "test_user")
print(f"Start result: {success}, Message: {message}")
# Get updated server status
updated_server = await crud.get_server_by_id(db, server.id)
print(f"Server status after start: {updated_server.status}")
# Test server stop
if updated_server.status == "Running":
print("\nTesting server stop:")
success, message = await orchestration_engine.stop_server(db, server.id, False, 10, "test_user")
print(f"Stop result: {success}, Message: {message}")
# Get updated server status
updated_server = await crud.get_server_by_id(db, server.id)
print(f"Server status after stop: {updated_server.status}")
# Test section operations
print("\nTesting section operations:")
print("Creating a second test server in the section...")
server_data2 = {
"name": "Orchestration Test Server 2",
"section_id": section.id,
"description": "A second test server for orchestration",
"runtime_definition": {
"type": "docker_image",
"command": "nginx:latest",
"args": [],
"ports": [
{
"containerPort": 8080,
"protocol": "TCP"
}
]
}
}
server2 = await crud.create_server(db, server_data2)
print(f"Created second server: {server2.name} (ID: {server2.id})")
# Test section start
print("\nTesting section start:")
task_id = await orchestration_engine.start_section(db, section.id, 2, "test_user")
print(f"Section start task ID: {task_id}")
# Wait a moment for the task to process
print("Waiting for task to process...")
await asyncio.sleep(5)
# Check task status
task = await crud.get_task(db, task_id)
print(f"Task status: {task.status}")
if task.result:
print(f"Task result: {task.result}")
# Get updated server statuses
updated_server = await crud.get_server_by_id(db, server.id)
updated_server2 = await crud.get_server_by_id(db, server2.id)
print(f"Server 1 status: {updated_server.status}")
print(f"Server 2 status: {updated_server2.status}")
# Test section stop
print("\nTesting section stop:")
task_id = await orchestration_engine.stop_section(db, section.id, 2, False, 10, "test_user")
print(f"Section stop task ID: {task_id}")
# Wait a moment for the task to process
print("Waiting for task to process...")
await asyncio.sleep(5)
# Check task status
task = await crud.get_task(db, task_id)
print(f"Task status: {task.status}")
if task.result:
print(f"Task result: {task.result}")
# Get updated server statuses
updated_server = await crud.get_server_by_id(db, server.id)
updated_server2 = await crud.get_server_by_id(db, server2.id)
print(f"Server 1 status: {updated_server.status}")
print(f"Server 2 status: {updated_server2.status}")
else:
print("Docker is not available. Skipping orchestration tests that require Docker.")
# Clean up
print("\nCleaning up test data...")
await crud.delete_server(db, server.id)
if 'server2' in locals():
await crud.delete_server(db, server2.id)
await crud.delete_section(db, section.id)
print("Test data cleaned up")
print("\nOrchestration engine test completed")
if __name__ == "__main__":
asyncio.run(test_orchestration_engine())