#!/usr/bin/env python3
"""
Comprehensive System Test
Tests all aspects of the Agent Orchestration Platform to ensure
everything is working correctly and all imports are resolved.
"""
import sys
from pathlib import Path
# Add the project root to Python path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
def test_basic_imports():
"""Test that all core types can be imported."""
print("π Testing basic imports...")
try:
# Test core ID types
from src.models import AgentId, SessionId, StateId, BackupId
from src.models.ids import create_session_id, validate_agent_name
print(" β
Core ID types imported")
# Test agent types including the newly added AgentCreationRequest
from src.models import AgentState, AgentStatus, AgentCreationRequest
print(" β
Agent types imported (including AgentCreationRequest)")
# Test session types
from src.models import SessionState, SessionStatus, GitState
print(" β
Session types imported")
# Test communication types
from src.models import MessageContent, MessageRole, OperationStatus
print(" β
Communication types imported")
# Test MCP result types
from src.models import AgentCreationResult, MCPToolResult
print(" β
MCP result types imported")
# Test iTerm2 types
from src.models import TabState, TabStatus, ITermError
print(" β
iTerm2 types imported")
return True
except Exception as e:
print(f" β Import failed: {e}")
return False
def test_type_functionality():
"""Test basic type system functionality."""
print("\nπ§ Testing type functionality...")
try:
from src.models.ids import create_session_id, validate_agent_name
from src.models.agent import AgentCreationRequest, AgentSpecialization
from src.models.session import GitState
from pathlib import Path
# Test session ID creation
session_id = create_session_id()
print(f" β
Created session ID: {session_id}")
# Test agent name validation
agent_id = validate_agent_name("Agent_1")
print(f" β
Validated agent ID: {agent_id}")
# Test AgentCreationRequest
request = AgentCreationRequest(
name="Agent_1",
session_id=session_id,
specialization=AgentSpecialization.GENERAL
)
print(f" β
Created AgentCreationRequest: {request.name}")
# Test GitState
git_state = GitState(
repository_path=Path("/test/repo"),
is_git_repo=True,
current_branch="main",
current_commit="a" * 40
)
print(f" β
Created GitState: {git_state.current_branch}")
return True
except Exception as e:
print(f" β Functionality test failed: {e}")
return False
def test_manager_imports():
"""Test that core managers can be imported."""
print("\nποΈ Testing manager imports...")
managers = [
('src.core.agent_manager', 'AgentManager'),
('src.core.session_manager', 'SessionManager'),
('src.core.coordinator', 'OrchestrationCoordinator')
]
success_count = 0
for module_path, class_name in managers:
try:
module = __import__(module_path, fromlist=[class_name])
manager_class = getattr(module, class_name)
print(f" β
{class_name} imported successfully")
success_count += 1
except Exception as e:
print(f" β οΈ {class_name} import failed: {e}")
return success_count >= len(managers) * 0.75 # 75% success rate
def test_mcp_tools():
"""Test that MCP tools can be imported."""
print("\nπ οΈ Testing MCP tools...")
try:
from src.interfaces.mcp_tools import AgentOrchestrationTools
print(" β
AgentOrchestrationTools imported successfully")
# Check if it has the expected methods
methods = [
'create_agent', 'delete_agent', 'create_session',
'get_session_status', 'delete_session', 'send_message_to_agent'
]
found_methods = 0
for method in methods:
if hasattr(AgentOrchestrationTools, method):
print(f" β
Method '{method}' found")
found_methods += 1
else:
print(f" β οΈ Method '{method}' not found")
return found_methods >= len(methods) * 0.75 # 75% success rate
except Exception as e:
print(f" β MCP tools import failed: {e}")
return False
def test_fastmcp_integration():
"""Test FastMCP integration."""
print("\nπ Testing FastMCP integration...")
try:
from fastmcp import FastMCP, Context
print(" β
FastMCP imported successfully")
# Test that we can create a basic FastMCP instance
mcp = FastMCP("test-server")
print(" β
FastMCP instance created")
return True
except ImportError as e:
print(f" β FastMCP not available: {e}")
print(" βΉοΈ This is expected if FastMCP is not installed")
return False
def test_validation_framework():
"""Test that validation framework works."""
print("\nπ Testing validation framework...")
try:
from src.models.validation import ValidationError, ValidationResult
from src.models.ids import validate_agent_name
# Test successful validation
result = ValidationResult.success("Test passed")
assert result.is_valid == True
print(" β
ValidationResult.success works")
# Test failed validation
result = ValidationResult.failure("Test failed", "test_field")
assert result.is_valid == False
print(" β
ValidationResult.failure works")
# Test validation exception for invalid agent name
try:
validate_agent_name("InvalidAgent")
print(" β οΈ Expected validation to fail for invalid agent name")
return False
except Exception:
print(" β
Agent name validation correctly rejects invalid names")
return True
except Exception as e:
print(f" β Validation framework test failed: {e}")
return False
def run_existing_tests():
"""Run any existing test suites."""
print("\nπ§ͺ Running existing tests...")
try:
# Try to run a simple pytest if available
import subprocess
result = subprocess.run([
sys.executable, "-m", "pytest",
"tests/integration/test_import_chain.py",
"-v"
], capture_output=True, text=True, timeout=30)
if result.returncode == 0:
print(" β
Import chain tests passed")
return True
else:
print(f" β οΈ Some tests failed: {result.stderr}")
return False
except Exception as e:
print(f" βΉοΈ Could not run pytest: {e}")
return True # Don't fail if pytest isn't available
def main():
"""Run comprehensive system validation."""
print("π Comprehensive Agent Orchestration Platform Test")
print("=" * 60)
tests = [
("Basic Imports", test_basic_imports),
("Type Functionality", test_type_functionality),
("Manager Imports", test_manager_imports),
("MCP Tools", test_mcp_tools),
("FastMCP Integration", test_fastmcp_integration),
("Validation Framework", test_validation_framework),
("Existing Tests", run_existing_tests)
]
results = []
for test_name, test_func in tests:
print(f"\nπ {test_name}")
print("-" * 30)
try:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f"β Test failed with exception: {e}")
results.append((test_name, False))
print("\n" + "=" * 60)
print("π COMPREHENSIVE TEST SUMMARY")
print("=" * 60)
passed = 0
for test_name, result in results:
status = "β
PASS" if result else "β FAIL"
print(f"{status} - {test_name}")
if result:
passed += 1
success_rate = passed / len(results)
print(f"\nResults: {passed}/{len(results)} tests passed ({success_rate:.1%})")
if success_rate >= 0.85: # 85% pass rate
print("\nπ SYSTEM VALIDATION SUCCESSFUL!")
print("π‘ All critical components are working correctly")
print("π Ready for deployment and integration testing")
return 0
elif success_rate >= 0.70: # 70% pass rate
print("\nβ οΈ SYSTEM MOSTLY WORKING")
print("π‘ Some minor issues remain but core functionality works")
print("π§ Consider addressing remaining issues")
return 0
else:
print("\nβ SYSTEM VALIDATION FAILED")
print("π‘ Critical issues need to be resolved")
print("π§ Fix failing tests before proceeding")
return 1
if __name__ == "__main__":
sys.exit(main())