Skip to main content
Glama
test_collective_comprehensive.py20.5 kB
#!/usr/bin/env python3 """ Comprehensive test suite for Collective Intelligence MCP tools. This script tests the 5 collective intelligence tools through direct function calls and measures their performance and functionality. """ import asyncio import json import os import sys import time from datetime import datetime from typing import Dict, Any, List from dotenv import load_dotenv # Add the src directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) from src.openrouter_mcp.handlers.collective_intelligence import ( CollectiveChatRequest, EnsembleReasoningRequest, AdaptiveModelRequest, CrossValidationRequest, CollaborativeSolvingRequest ) # Import the underlying functionality directly from src.openrouter_mcp.collective_intelligence import ( ConsensusEngine, EnsembleReasoner, AdaptiveRouter, CrossValidator, CollaborativeSolver, ConsensusConfig, ConsensusStrategy, TaskContext, TaskType, ) from src.openrouter_mcp.client.openrouter import OpenRouterClient from src.openrouter_mcp.handlers.collective_intelligence import OpenRouterModelProvider # Load environment variables load_dotenv() class CollectiveIntelligenceTestSuite: """Comprehensive test suite for collective intelligence tools.""" def __init__(self): self.client = None self.model_provider = None self.test_results = {} self.start_time = None async def setup(self): """Setup test environment.""" print("[SETUP] Initializing test environment...") # Check API key if not os.getenv("OPENROUTER_API_KEY"): raise ValueError("OPENROUTER_API_KEY environment variable not set!") # Initialize client and model provider self.client = OpenRouterClient.from_env() self.model_provider = OpenRouterModelProvider(self.client) print("[SETUP] Test environment initialized successfully") async def teardown(self): """Cleanup test environment.""" if self.client: await self.client.close() print("[TEARDOWN] Test environment cleaned up") def create_task_context(self, content: str, task_type: str = "reasoning") -> TaskContext: """Create a TaskContext for testing.""" try: task_type_enum = TaskType(task_type.lower()) except ValueError: task_type_enum = TaskType.REASONING return TaskContext( task_type=task_type_enum, content=content, requirements={}, constraints={} ) async def test_collective_chat_completion(self) -> Dict[str, Any]: """Test collective chat completion functionality.""" print("\n[TEST 1/5] Testing Collective Chat Completion...") test_start = time.time() try: # Setup consensus engine config = ConsensusConfig( strategy=ConsensusStrategy.MAJORITY_VOTE, min_models=2, max_models=3, timeout_seconds=60.0 ) consensus_engine = ConsensusEngine(self.model_provider, config) # Create test task task = self.create_task_context( "Explain the key benefits of renewable energy in 2-3 sentences.", "reasoning" ) # Process with consensus async with self.client: result = await consensus_engine.process(task) test_time = time.time() - test_start success = bool(result.consensus_content and len(result.consensus_content) > 50) return { "test_name": "collective_chat_completion", "success": success, "response_time": test_time, "consensus_response": result.consensus_content[:200] + "..." if len(result.consensus_content) > 200 else result.consensus_content, "agreement_level": result.agreement_level.value, "confidence_score": result.confidence_score, "participating_models": result.participating_models, "strategy_used": result.strategy_used.value, "quality_score": result.quality_metrics.overall_score(), "error": None } except Exception as e: test_time = time.time() - test_start print(f"[ERROR] Collective chat completion failed: {str(e)}") return { "test_name": "collective_chat_completion", "success": False, "response_time": test_time, "error": str(e) } async def test_ensemble_reasoning(self) -> Dict[str, Any]: """Test ensemble reasoning functionality.""" print("\n[TEST 2/5] Testing Ensemble Reasoning...") test_start = time.time() try: ensemble_reasoner = EnsembleReasoner(self.model_provider) # Create test task task = self.create_task_context( "What are the main factors to consider when designing a sustainable city transport system?", "analysis" ) # Process with ensemble reasoning async with self.client: result = await ensemble_reasoner.process(task, decompose=True) test_time = time.time() - test_start success = bool(result.final_content and len(result.sub_task_results) > 0) return { "test_name": "ensemble_reasoning", "success": success, "response_time": test_time, "final_result": result.final_content[:200] + "..." if len(result.final_content) > 200 else result.final_content, "subtasks_completed": len(result.sub_task_results), "strategy_used": result.decomposition_strategy.value, "success_rate": result.success_rate, "total_cost": result.total_cost, "quality_score": result.overall_quality.overall_score(), "error": None } except Exception as e: test_time = time.time() - test_start print(f"[ERROR] Ensemble reasoning failed: {str(e)}") return { "test_name": "ensemble_reasoning", "success": False, "response_time": test_time, "error": str(e) } async def test_adaptive_model_selection(self) -> Dict[str, Any]: """Test adaptive model selection functionality.""" print("\n[TEST 3/5] Testing Adaptive Model Selection...") test_start = time.time() try: adaptive_router = AdaptiveRouter(self.model_provider) # Create test task task = self.create_task_context( "Write a Python function to implement quicksort algorithm", "code_generation" ) # Process with adaptive routing async with self.client: decision = await adaptive_router.process(task) test_time = time.time() - test_start success = bool(decision.selected_model_id and decision.confidence_score > 0) return { "test_name": "adaptive_model_selection", "success": success, "response_time": test_time, "selected_model": decision.selected_model_id, "selection_reasoning": decision.justification[:150] + "..." if len(decision.justification) > 150 else decision.justification, "confidence": decision.confidence_score, "alternatives_count": len(decision.alternative_models), "strategy_used": decision.strategy_used.value, "expected_performance": decision.expected_performance, "error": None } except Exception as e: test_time = time.time() - test_start print(f"[ERROR] Adaptive model selection failed: {str(e)}") return { "test_name": "adaptive_model_selection", "success": False, "response_time": test_time, "error": str(e) } async def test_cross_model_validation(self) -> Dict[str, Any]: """Test cross-model validation functionality.""" print("\n[TEST 4/5] Testing Cross-Model Validation...") test_start = time.time() try: from src.openrouter_mcp.collective_intelligence.base import ProcessingResult cross_validator = CrossValidator(self.model_provider) # Create dummy result to validate content = "Climate change is primarily caused by human activities, especially fossil fuel combustion." dummy_result = ProcessingResult( task_id="validation_test", model_id="test_model", content=content, confidence=1.0 ) # Create task context task = self.create_task_context(content, "analysis") # Process with cross-validation async with self.client: result = await cross_validator.process(dummy_result, task) test_time = time.time() - test_start success = result.validation_confidence > 0 return { "test_name": "cross_model_validation", "success": success, "response_time": test_time, "validation_result": "VALID" if result.is_valid else "INVALID", "validation_score": result.validation_confidence, "issues_found": len(result.validation_report.issues), "recommendations_count": len(result.improvement_suggestions), "quality_score": result.quality_metrics.overall_score(), "confidence": result.validation_confidence, "error": None } except Exception as e: test_time = time.time() - test_start print(f"[ERROR] Cross-model validation failed: {str(e)}") return { "test_name": "cross_model_validation", "success": False, "response_time": test_time, "error": str(e) } async def test_collaborative_problem_solving(self) -> Dict[str, Any]: """Test collaborative problem solving functionality.""" print("\n[TEST 5/5] Testing Collaborative Problem Solving...") test_start = time.time() try: collaborative_solver = CollaborativeSolver(self.model_provider) # Create test task task = self.create_task_context( "Develop a strategy to reduce food waste in restaurants and grocery stores.", "analysis" ) task.requirements = {"stakeholders": ["restaurants", "grocery_stores", "consumers"]} # Process with collaborative solving async with self.client: result = await collaborative_solver.process(task, strategy="iterative") test_time = time.time() - test_start success = bool(result.final_content and len(result.final_content) > 100) return { "test_name": "collaborative_problem_solving", "success": success, "response_time": test_time, "final_solution": result.final_content[:200] + "..." if len(result.final_content) > 200 else result.final_content, "solution_path_length": len(result.solution_path), "alternative_solutions_count": len(result.alternative_solutions), "confidence": result.confidence_score, "quality_score": result.quality_assessment.overall_score(), "strategy_used": result.session.strategy.value, "components_used": result.session.components_used, "error": None } except Exception as e: test_time = time.time() - test_start print(f"[ERROR] Collaborative problem solving failed: {str(e)}") return { "test_name": "collaborative_problem_solving", "success": False, "response_time": test_time, "error": str(e) } async def test_openrouter_integration(self) -> Dict[str, Any]: """Test basic OpenRouter API integration.""" print("\n[INTEGRATION] Testing OpenRouter API Integration...") test_start = time.time() try: async with self.client: # Test basic API call models = await self.client.list_models() # Test a simple chat completion response = await self.client.chat_completion( model="openai/gpt-3.5-turbo", messages=[{"role": "user", "content": "Say 'API integration test successful'"}], max_tokens=50 ) test_time = time.time() - test_start success = bool(models and response and response.get("choices")) return { "test_name": "openrouter_integration", "success": success, "response_time": test_time, "models_available": len(models) if models else 0, "api_response_received": bool(response), "error": None } except Exception as e: test_time = time.time() - test_start print(f"[ERROR] OpenRouter integration test failed: {str(e)}") return { "test_name": "openrouter_integration", "success": False, "response_time": test_time, "error": str(e) } async def run_comprehensive_test_suite(self) -> Dict[str, Any]: """Run the complete test suite.""" print("=" * 70) print("[AI] Collective Intelligence MCP Tools - Comprehensive Test Suite") print("=" * 70) self.start_time = time.time() # Setup await self.setup() # Run all tests test_results = [] # Test OpenRouter integration first integration_result = await self.test_openrouter_integration() test_results.append(integration_result) if integration_result["success"]: print("[SUCCESS] OpenRouter API integration working properly") # Run collective intelligence tests test_results.append(await self.test_collective_chat_completion()) test_results.append(await self.test_ensemble_reasoning()) test_results.append(await self.test_adaptive_model_selection()) test_results.append(await self.test_cross_model_validation()) test_results.append(await self.test_collaborative_problem_solving()) else: print("[ERROR] OpenRouter API integration failed - skipping collective intelligence tests") # Cleanup await self.teardown() # Generate summary total_time = time.time() - self.start_time summary = self.generate_test_summary(test_results, total_time) return summary def generate_test_summary(self, test_results: List[Dict[str, Any]], total_time: float) -> Dict[str, Any]: """Generate comprehensive test summary.""" successful_tests = [r for r in test_results if r["success"]] failed_tests = [r for r in test_results if not r["success"]] avg_response_time = sum(r["response_time"] for r in test_results) / len(test_results) summary = { "timestamp": datetime.now().isoformat(), "total_tests": len(test_results), "successful_tests": len(successful_tests), "failed_tests": len(failed_tests), "success_rate": len(successful_tests) / len(test_results) * 100, "total_execution_time": total_time, "average_response_time": avg_response_time, "individual_results": test_results, "tools_status": { "collective_chat_completion": next((r["success"] for r in test_results if r["test_name"] == "collective_chat_completion"), False), "ensemble_reasoning": next((r["success"] for r in test_results if r["test_name"] == "ensemble_reasoning"), False), "adaptive_model_selection": next((r["success"] for r in test_results if r["test_name"] == "adaptive_model_selection"), False), "cross_model_validation": next((r["success"] for r in test_results if r["test_name"] == "cross_model_validation"), False), "collaborative_problem_solving": next((r["success"] for r in test_results if r["test_name"] == "collaborative_problem_solving"), False), "openrouter_integration": next((r["success"] for r in test_results if r["test_name"] == "openrouter_integration"), False) } } return summary async def main(): """Main execution function.""" test_suite = CollectiveIntelligenceTestSuite() try: summary = await test_suite.run_comprehensive_test_suite() # Print summary print("\n" + "=" * 70) print(" TEST SUMMARY REPORT") print("=" * 70) print(f"Total Tests Run: {summary['total_tests']}") print(f"Successful: {summary['successful_tests']}") print(f"Failed: {summary['failed_tests']}") print(f"Success Rate: {summary['success_rate']:.1f}%") print(f"Total Execution Time: {summary['total_execution_time']:.2f}s") print(f"Average Response Time: {summary['average_response_time']:.2f}s") print("\nTool Status:") for tool, status in summary['tools_status'].items(): status_symbol = "✅" if status else "❌" print(f" {status_symbol} {tool}: {'WORKING' if status else 'FAILED'}") print("\nDetailed Results:") for result in summary['individual_results']: status_symbol = "✅" if result["success"] else "❌" print(f" {status_symbol} {result['test_name']}: {result['response_time']:.2f}s") if not result["success"] and result.get("error"): print(f" Error: {result['error']}") # Save results to file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") results_file = f"collective_intelligence_test_results_{timestamp}.json" with open(results_file, 'w') as f: json.dump(summary, f, indent=2, default=str) print(f"\nDetailed results saved to: {results_file}") if summary['success_rate'] >= 80: print("\n🎉 OVERALL RESULT: COLLECTIVE INTELLIGENCE TOOLS ARE WORKING PROPERLY!") else: print("\n⚠️ OVERALL RESULT: SOME TOOLS HAVE ISSUES THAT NEED ATTENTION") return summary except Exception as e: print(f"\n❌ Test suite execution failed: {str(e)}") return {"error": str(e), "success": False} if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/physics91/openrouter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server