test_phase3_integration.py•7.74 kB
"""
Integration test for Phase 3 components with main system
"""
import pytest
import sys
import os
import asyncio
from datetime import datetime
# Add the project root to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
# Import Phase 3 components
from katamari_mcp.acp.workflow_optimizer import WorkflowOptimizer
from katamari_mcp.acp.predictive_engine import PredictiveEngine
from katamari_mcp.acp.knowledge_transfer import KnowledgeTransferEngine
from katamari_mcp.acp.self_healing import SelfHealingEngine
class TestPhase3SystemIntegration:
"""Test Phase 3 components working together as a system"""
@pytest.fixture
def phase3_components(self):
"""Create all Phase 3 components"""
return {
'workflow_optimizer': WorkflowOptimizer(),
'predictive_engine': PredictiveEngine(),
'knowledge_transfer': KnowledgeTransferEngine(),
'self_healing': SelfHealingEngine()
}
def test_components_initialization(self, phase3_components):
"""Test all Phase 3 components can be initialized"""
for name, component in phase3_components.items():
assert component is not None, f"{name} should be initialized"
@pytest.mark.asyncio
async def test_cross_component_data_flow(self, phase3_components):
"""Test data flow between Phase 3 components"""
optimizer = phase3_components['workflow_optimizer']
predictive = phase3_components['predictive_engine']
knowledge = phase3_components['knowledge_transfer']
healing = phase3_components['self_healing']
# Simulate a workflow execution that generates data
capability_id = "test_capability"
# 1. Add performance data to predictive engine
await predictive.add_performance_data(
capability_id=capability_id,
value=1.5,
timestamp=datetime.now()
)
# 2. Register capability profile with knowledge transfer
await knowledge.register_capability_profile(
capability_id=capability_id,
profile={
"capability_id": capability_id,
"description": "Test capability for integration",
"tags": ["test", "integration"],
"performance_metrics": {"avg_execution_time": 1.5},
"learning_patterns": ["sequential", "batch"]
}
)
# 3. Report error to self-healing
await healing.report_error(
capability_id=capability_id,
error_type="TestError",
error_message="Integration test error",
context={"test": True}
)
# Verify all components have data
assert capability_id in predictive.performance_history
assert capability_id in knowledge.capability_profiles
assert capability_id in healing.error_history
@pytest.mark.asyncio
async def test_predictive_to_workflow_optimization(self, phase3_components):
"""Test predictive engine informing workflow optimization"""
predictive = phase3_components['predictive_engine']
optimizer = phase3_components['workflow_optimizer']
capability_id = "slow_capability"
# Add performance data showing degradation
for i in range(5):
await predictive.add_performance_data(
capability_id=capability_id,
value=1.0 + (i * 0.2), # Increasing execution time
timestamp=datetime.now()
)
# Trigger analysis
await predictive._analyze_performance_data(capability_id)
# Check if predictions were generated
predictions = predictive.predictions.get(capability_id, [])
assert len(predictions) >= 0 # May or may not generate predictions based on algorithm
@pytest.mark.asyncio
async def test_knowledge_transfer_to_self_healing(self, phase3_components):
"""Test knowledge transfer informing self-healing strategies"""
knowledge = phase3_components['knowledge_transfer']
healing = phase3_components['self_healing']
source_capability = "resilient_capability"
target_capability = "vulnerable_capability"
# Register profiles for both capabilities
await knowledge.register_capability_profile(
capability_id=source_capability,
profile={
"capability_id": source_capability,
"description": "Resilient capability with good error handling",
"tags": ["resilient", "error_handling"],
"performance_metrics": {"error_rate": 0.01},
"learning_patterns": ["retry_logic", "circuit_breaker"]
}
)
await knowledge.register_capability_profile(
capability_id=target_capability,
profile={
"capability_id": target_capability,
"description": "Vulnerable capability",
"tags": ["vulnerable"],
"performance_metrics": {"error_rate": 0.2},
"learning_patterns": ["basic"]
}
)
# Report errors for vulnerable capability
await healing.report_error(
capability_id=target_capability,
error_type="NetworkError",
error_message="Connection failed",
context={"retry_count": 0}
)
# Verify both systems have relevant data
assert source_capability in knowledge.capability_profiles
assert target_capability in knowledge.capability_profiles
assert target_capability in healing.error_history
def test_component_configuration_compatibility(self, phase3_components):
"""Test that all components can be configured consistently"""
# All components should have similar configuration patterns
for name, component in phase3_components.items():
# Should have initialization without errors
assert component is not None
# Should have some form of data storage
if hasattr(component, '__dict__'):
# Check for common attributes like data storage
component_dict = component.__dict__
assert len(component_dict) > 0 # Should have some attributes
@pytest.mark.asyncio
async def test_system_resilience(self, phase3_components):
"""Test system resilience with multiple components"""
# Test that system continues to function even if one component fails
predictive = phase3_components['predictive_engine']
healing = phase3_components['self_healing']
capability_id = "resilient_test_capability"
# Add data to predictive engine
await predictive.add_performance_data(
capability_id=capability_id,
value=1.0,
timestamp=datetime.now()
)
# Report error to healing engine
await healing.report_error(
capability_id=capability_id,
error_type="TestError",
error_message="Resilience test",
context={"test": True}
)
# Both should have data independently
assert capability_id in predictive.performance_history
assert capability_id in healing.error_history
# System should continue to function
assert len(predictive.performance_history) > 0
assert len(healing.error_history) > 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])