Skip to main content
Glama
test_integration_suite.py22.5 kB
""" Integration test suite for Collective Intelligence system. This module provides comprehensive integration tests that verify the interaction between different CI components and end-to-end workflows. """ import asyncio import pytest from datetime import datetime from typing import List, Dict, Any from unittest.mock import AsyncMock, Mock, patch from src.openrouter_mcp.collective_intelligence.consensus_engine import ConsensusEngine from src.openrouter_mcp.collective_intelligence.ensemble_reasoning import EnsembleReasoner from src.openrouter_mcp.collective_intelligence.adaptive_router import AdaptiveRouter from src.openrouter_mcp.collective_intelligence.cross_validator import CrossValidator from src.openrouter_mcp.collective_intelligence.collaborative_solver import CollaborativeSolver from src.openrouter_mcp.collective_intelligence.base import ( TaskContext, ProcessingResult, ModelInfo, TaskType, ModelCapability ) @pytest.mark.integration class TestCollectiveIntelligenceIntegration: """Integration tests for the complete Collective Intelligence system.""" @pytest.fixture def integrated_model_provider(self, sample_models): """Create a comprehensive mock provider for integration testing.""" provider = AsyncMock() provider.get_available_models.return_value = sample_models # Create different response patterns for different models response_patterns = { "openai/gpt-4": "GPT-4 provides detailed and analytical responses with high accuracy.", "anthropic/claude-3-haiku": "Claude offers balanced perspectives with ethical considerations.", "meta-llama/llama-3.1-70b": "Llama generates creative and comprehensive solutions.", "google/gemini-pro": "Gemini delivers structured and well-organized responses.", "mistralai/mixtral-8x7b": "Mixtral provides efficient and practical answers." } async def integrated_process_task(task, model_id, **kwargs): # Simulate different processing times and qualities model_characteristics = { "openai/gpt-4": {"time": 2.5, "confidence": 0.9, "cost": 0.03}, "anthropic/claude-3-haiku": {"time": 1.2, "confidence": 0.85, "cost": 0.015}, "meta-llama/llama-3.1-70b": {"time": 3.1, "confidence": 0.8, "cost": 0.02}, "google/gemini-pro": {"time": 2.8, "confidence": 0.87, "cost": 0.025}, "mistralai/mixtral-8x7b": {"time": 1.8, "confidence": 0.82, "cost": 0.012} } char = model_characteristics.get(model_id, {"time": 2.0, "confidence": 0.8, "cost": 0.02}) base_response = response_patterns.get(model_id, f"Response from {model_id}") # Simulate processing delay await asyncio.sleep(0.01) # Small delay for realism return ProcessingResult( task_id=task.task_id, model_id=model_id, content=f"{base_response} Task: {task.content[:100]}...", confidence=char["confidence"], processing_time=char["time"], tokens_used=len(task.content) // 4, cost=char["cost"] ) provider.process_task.side_effect = integrated_process_task return provider @pytest.fixture def sample_integration_tasks(self): """Create sample tasks for integration testing.""" return [ TaskContext( task_id="integration_simple", task_type=TaskType.FACTUAL, content="What are the main benefits of renewable energy?", requirements={"format": "list", "detail": "concise"} ), TaskContext( task_id="integration_reasoning", task_type=TaskType.REASONING, content="Analyze the potential impacts of artificial intelligence on employment. " "Consider both positive and negative effects, and suggest mitigation strategies.", requirements={"analysis_depth": "comprehensive", "include_examples": True} ), TaskContext( task_id="integration_creative", task_type=TaskType.CREATIVE, content="Design a sustainable city planning strategy for a population of 1 million people. " "Include transportation, energy, waste management, and green spaces.", requirements={"innovation": "high", "feasibility": "realistic"} ), TaskContext( task_id="integration_code", task_type=TaskType.CODE_GENERATION, content="Create a Python class for managing a distributed cache system with " "automatic failover and data replication.", requirements={"documentation": True, "error_handling": True, "testing": True} ) ] @pytest.mark.asyncio async def test_consensus_to_validation_pipeline(self, integrated_model_provider, sample_integration_tasks): """Test pipeline from consensus building to validation.""" consensus_engine = ConsensusEngine(integrated_model_provider) cross_validator = CrossValidator(integrated_model_provider) task = sample_integration_tasks[1] # Reasoning task # Step 1: Build consensus consensus_result = await consensus_engine.process(task) assert consensus_result is not None assert consensus_result.consensus_content assert len(consensus_result.participating_models) > 0 # Step 2: Validate the consensus consensus_processing_result = ProcessingResult( task_id=task.task_id, model_id="consensus_ensemble", content=consensus_result.consensus_content, confidence=consensus_result.confidence_score ) validation_result = await cross_validator.process(consensus_processing_result, task) assert validation_result is not None assert isinstance(validation_result.is_valid, bool) assert validation_result.validation_confidence > 0.0 # Verify integration quality assert validation_result.quality_metrics.overall_score() > 0.0 @pytest.mark.asyncio async def test_router_to_ensemble_integration(self, integrated_model_provider, sample_integration_tasks): """Test integration between adaptive router and ensemble reasoning.""" adaptive_router = AdaptiveRouter(integrated_model_provider) ensemble_reasoner = EnsembleReasoner(integrated_model_provider) task = sample_integration_tasks[2] # Creative task # Step 1: Route to find best models routing_decision = await adaptive_router.process(task) assert routing_decision is not None assert routing_decision.selected_model_id assert routing_decision.confidence_score > 0.0 # Step 2: Use ensemble reasoning ensemble_result = await ensemble_reasoner.process(task) assert ensemble_result is not None assert ensemble_result.final_content assert len(ensemble_result.sub_task_results) > 0 # Verify that routing informed ensemble decisions participating_models = [str.model_id for str in ensemble_result.sub_task_results] # The routed model should influence ensemble selection assert len(participating_models) > 0 assert ensemble_result.success_rate > 0.0 @pytest.mark.asyncio async def test_full_ci_pipeline_integration(self, integrated_model_provider, sample_integration_tasks): """Test complete CI pipeline with all components.""" # Initialize all components adaptive_router = AdaptiveRouter(integrated_model_provider) ensemble_reasoner = EnsembleReasoner(integrated_model_provider) consensus_engine = ConsensusEngine(integrated_model_provider) cross_validator = CrossValidator(integrated_model_provider) task = sample_integration_tasks[3] # Code generation task # Step 1: Route to optimal model routing_decision = await adaptive_router.process(task) optimal_model = routing_decision.selected_model_id # Step 2: Generate initial solution with ensemble ensemble_result = await ensemble_reasoner.process(task) initial_solution = ensemble_result.final_content # Step 3: Build consensus for improvement consensus_result = await consensus_engine.process(task) consensus_solution = consensus_result.consensus_content # Step 4: Validate both solutions ensemble_pr = ProcessingResult( task_id=task.task_id, model_id="ensemble", content=initial_solution, confidence=ensemble_result.success_rate ) consensus_pr = ProcessingResult( task_id=task.task_id, model_id="consensus", content=consensus_solution, confidence=consensus_result.confidence_score ) ensemble_validation = await cross_validator.process(ensemble_pr, task) consensus_validation = await cross_validator.process(consensus_pr, task) # Verify complete pipeline results assert routing_decision.confidence_score > 0.0 assert ensemble_result.success_rate > 0.0 assert consensus_result.confidence_score > 0.0 assert ensemble_validation.validation_confidence > 0.0 assert consensus_validation.validation_confidence > 0.0 # Compare validation results ensemble_quality = ensemble_validation.quality_metrics.overall_score() consensus_quality = consensus_validation.quality_metrics.overall_score() # At least one approach should produce good quality assert max(ensemble_quality, consensus_quality) > 0.5 @pytest.mark.asyncio async def test_collaborative_solver_integration(self, integrated_model_provider, sample_integration_tasks): """Test collaborative solver integration with all components.""" collaborative_solver = CollaborativeSolver(integrated_model_provider) # Test different solving strategies strategies = [ 'sequential', 'parallel', 'hierarchical', 'adaptive' ] task = sample_integration_tasks[0] # Simple factual task results = {} for strategy in strategies: try: result = await collaborative_solver.process(task, strategy=strategy) results[strategy] = { 'success': True, 'confidence': result.confidence_score, 'content_length': len(result.final_content), 'processing_time': result.total_processing_time, 'components_used': len(result.component_contributions) } except Exception as e: results[strategy] = { 'success': False, 'error': str(e) } # Verify at least some strategies succeeded successful_strategies = [s for s, r in results.items() if r['success']] assert len(successful_strategies) > 0 # Verify quality across successful strategies for strategy in successful_strategies: result = results[strategy] assert result['confidence'] > 0.0 assert result['content_length'] > 0 assert result['components_used'] > 0 @pytest.mark.asyncio async def test_error_propagation_and_recovery(self, sample_integration_tasks): """Test error handling and recovery across component integration.""" # Create a provider that fails intermittently failing_provider = AsyncMock() call_count = 0 async def intermittent_failure(task, model_id, **kwargs): nonlocal call_count call_count += 1 # Fail every 3rd call if call_count % 3 == 0: raise Exception(f"Simulated failure for {model_id}") return ProcessingResult( task_id=task.task_id, model_id=model_id, content=f"Success response from {model_id}", confidence=0.8 ) failing_provider.process_task.side_effect = intermittent_failure failing_provider.get_available_models.return_value = [ ModelInfo(model_id="test_model_1", name="Test Model 1", provider="Test"), ModelInfo(model_id="test_model_2", name="Test Model 2", provider="Test"), ModelInfo(model_id="test_model_3", name="Test Model 3", provider="Test") ] # Test consensus engine resilience consensus_engine = ConsensusEngine(failing_provider) task = sample_integration_tasks[0] try: result = await consensus_engine.process(task) # Should succeed despite some failures assert result is not None # Some models should have succeeded assert len(result.participating_models) > 0 except Exception: # If it fails completely, that's also acceptable for this test pass @pytest.mark.asyncio async def test_concurrent_multi_component_processing(self, integrated_model_provider, sample_integration_tasks): """Test concurrent processing across multiple components.""" # Initialize components components = { 'consensus': ConsensusEngine(integrated_model_provider), 'router': AdaptiveRouter(integrated_model_provider), 'ensemble': EnsembleReasoner(integrated_model_provider) } tasks = sample_integration_tasks[:3] # Use first 3 tasks # Process each task with each component concurrently async def process_task_with_component(task, component_name, component): try: if component_name == 'router': return await component.process(task) else: return await component.process(task) except Exception as e: return f"Error in {component_name}: {str(e)}" # Create all combinations concurrent_operations = [] for task in tasks: for comp_name, comp_instance in components.items(): concurrent_operations.append( process_task_with_component(task, comp_name, comp_instance) ) # Execute all operations concurrently results = await asyncio.gather(*concurrent_operations, return_exceptions=True) # Analyze results successful_results = [r for r in results if not isinstance(r, Exception) and not isinstance(r, str)] failed_results = [r for r in results if isinstance(r, Exception) or isinstance(r, str)] # Should have reasonable success rate success_rate = len(successful_results) / len(results) assert success_rate > 0.5 # At least 50% success rate # Should complete in reasonable time (implicit in test completion) assert len(results) == len(concurrent_operations) @pytest.mark.asyncio async def test_data_flow_consistency(self, integrated_model_provider, sample_integration_tasks): """Test data consistency across component interactions.""" task = sample_integration_tasks[1] # Reasoning task # Process with ensemble reasoning ensemble_reasoner = EnsembleReasoner(integrated_model_provider) ensemble_result = await ensemble_reasoner.process(task) # Extract a sub-task result for validation if ensemble_result.sub_task_results: sub_result = ensemble_result.sub_task_results[0] # Validate the sub-result cross_validator = CrossValidator(integrated_model_provider) validation_result = await cross_validator.process(sub_result.result, task) # Verify data consistency assert validation_result.task_id == task.task_id assert validation_result.original_result.task_id == sub_result.result.task_id assert validation_result.original_result.model_id == sub_result.result.model_id # Content should be related original_content = sub_result.result.content validation_content = validation_result.validation_report.original_result.content assert original_content == validation_content @pytest.mark.asyncio async def test_quality_improvement_across_components(self, integrated_model_provider): """Test that quality improves as content flows through components.""" # Create a task that can benefit from multiple processing stages improvement_task = TaskContext( task_id="quality_improvement_test", task_type=TaskType.ANALYSIS, content="Analyze the economic impact of remote work on urban development, " "considering real estate, transportation, local businesses, and social dynamics.", requirements={"depth": "comprehensive", "evidence": "required"} ) # Stage 1: Initial ensemble processing ensemble_reasoner = EnsembleReasoner(integrated_model_provider) ensemble_result = await ensemble_reasoner.process(improvement_task) # Stage 2: Build consensus for refinement consensus_engine = ConsensusEngine(integrated_model_provider) consensus_result = await consensus_engine.process(improvement_task) # Stage 3: Validate both results cross_validator = CrossValidator(integrated_model_provider) ensemble_pr = ProcessingResult( task_id=improvement_task.task_id, model_id="ensemble_stage", content=ensemble_result.final_content, confidence=ensemble_result.success_rate ) consensus_pr = ProcessingResult( task_id=improvement_task.task_id, model_id="consensus_stage", content=consensus_result.consensus_content, confidence=consensus_result.confidence_score ) ensemble_validation = await cross_validator.process(ensemble_pr, improvement_task) consensus_validation = await cross_validator.process(consensus_pr, improvement_task) # Analyze quality progression ensemble_quality = ensemble_validation.quality_metrics.overall_score() consensus_quality = consensus_validation.quality_metrics.overall_score() # At least one stage should show good quality max_quality = max(ensemble_quality, consensus_quality) assert max_quality > 0.3 # Reasonable quality threshold # Verify validation provides meaningful feedback assert len(ensemble_validation.improvement_suggestions) >= 0 assert len(consensus_validation.improvement_suggestions) >= 0 @pytest.mark.asyncio async def test_component_interoperability(self, integrated_model_provider, sample_integration_tasks): """Test that components can seamlessly work together.""" task = sample_integration_tasks[2] # Creative task # Chain components together # Router → Ensemble → Consensus → Validation # Step 1: Route router = AdaptiveRouter(integrated_model_provider) routing_decision = await router.process(task) # Step 2: Ensemble reasoning ensemble = EnsembleReasoner(integrated_model_provider) ensemble_result = await ensemble.process(task) # Step 3: Consensus building consensus = ConsensusEngine(integrated_model_provider) consensus_result = await consensus.process(task) # Step 4: Cross-validation validator = CrossValidator(integrated_model_provider) # Validate ensemble result ensemble_pr = ProcessingResult( task_id=task.task_id, model_id="ensemble_output", content=ensemble_result.final_content, confidence=ensemble_result.success_rate ) ensemble_validation = await validator.process(ensemble_pr, task) # Validate consensus result consensus_pr = ProcessingResult( task_id=task.task_id, model_id="consensus_output", content=consensus_result.consensus_content, confidence=consensus_result.confidence_score ) consensus_validation = await validator.process(consensus_pr, task) # Verify interoperability # All components should work with the same task assert routing_decision.task_id == task.task_id assert ensemble_result.task_id == task.task_id assert consensus_result.task_id == task.task_id assert ensemble_validation.task_id == task.task_id assert consensus_validation.task_id == task.task_id # Results should be meaningful assert len(routing_decision.selected_model_id) > 0 assert len(ensemble_result.final_content) > 0 assert len(consensus_result.consensus_content) > 0 assert ensemble_validation.validation_confidence > 0.0 assert consensus_validation.validation_confidence > 0.0 # Validation should provide insights total_issues = (len(ensemble_validation.validation_report.issues) + len(consensus_validation.validation_report.issues)) total_suggestions = (len(ensemble_validation.improvement_suggestions) + len(consensus_validation.improvement_suggestions)) # At least some feedback should be provided assert total_issues >= 0 assert total_suggestions >= 0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/physics91/openrouter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server