Skip to main content
Glama

MCP Standards

by airmcp-com
test_agent_intelligence.py10.6 kB
"""Integration tests for agent intelligence and AI routing Tests model selection, complexity assessment, and intelligent agent coordination. Target: 85%+ coverage for AI features """ import pytest @pytest.mark.integration class TestModelRouting: """Test intelligent model selection and routing""" async def test_task_complexity_assessment_simple(self, memory_server): """Test simple task routed to fast model""" task = { "type": "search", "description": "Find episode about Python", "complexity": "low" } # Route task (if implemented) # This would call model router # For now, test that simple searches work result = await memory_server._search_episodes("Python", limit=10) assert result.get("success", False) or isinstance(result, dict) async def test_task_complexity_assessment_complex(self, memory_server): """Test complex task routed to advanced model""" task = { "type": "generate_ai_standards", "description": "Generate comprehensive CLAUDE.md from all configs", "complexity": "high" } # Complex tasks should use more powerful models # Test standards generation (complex task) # This is implicitly tested by the standards generation working async def test_cost_optimization_enabled(self, memory_server, monkeypatch): """Test cost optimization when enabled""" monkeypatch.setenv("GEMINI_API_KEY", "test-key") # With cost optimization, should prefer cheaper models # This is a integration point that would be tested with real API assert True # Placeholder for actual cost optimization test async def test_fallback_model_on_error(self, memory_server): """Test fallback to alternative model on error""" # Simulate primary model failure # System should fallback gracefully # This requires mocking API calls assert True # Placeholder async def test_routing_statistics_tracking(self, memory_server): """Test model routing statistics are tracked""" # Execute various tasks await memory_server._search_episodes("test", limit=5) await memory_server._list_recent(limit=5) # Stats should be tracked (if implemented) # Check tool_logs for execution patterns import sqlite3 with sqlite3.connect(memory_server.db_path) as conn: count = conn.execute("SELECT COUNT(*) FROM tool_logs").fetchone()[0] assert count >= 0 @pytest.mark.integration class TestAgentPerformanceTracking: """Test agent performance tracking and optimization""" async def test_execution_logging(self, memory_server): """Test agent executions are logged""" result = await memory_server._log_tool_execution( tool_name="agent_task", args={"task": "analyze code"}, result={"success": True, "duration": 1.5} ) assert result["success"] is True async def test_performance_statistics(self, memory_server): """Test performance statistics calculation""" # Log multiple executions for i in range(10): await memory_server._log_tool_execution( tool_name=f"task_{i % 3}", args={"index": i}, result={"success": i % 4 != 0} # 75% success rate ) # Stats should be calculable from logs import sqlite3 with sqlite3.connect(memory_server.db_path) as conn: total = conn.execute("SELECT COUNT(*) FROM tool_logs").fetchone()[0] assert total >= 10 async def test_agent_recommendations(self, memory_server): """Test agent recommendations based on performance""" # Log various agent performances await memory_server._log_tool_execution( tool_name="researcher", args={"task": "research"}, result={"success": True, "quality_score": 0.95} ) await memory_server._log_tool_execution( tool_name="coder", args={"task": "code"}, result={"success": True, "quality_score": 0.85} ) # System should recommend best agent for task type # This would be part of agent intelligence module async def test_success_rate_calculation(self, memory_server): """Test success rate tracking for agents""" # Log 10 tasks, 8 successful for i in range(10): await memory_server._log_tool_execution( tool_name="test_agent", args={"task": i}, result={"success": i < 8} ) # Calculate success rate import sqlite3 with sqlite3.connect(memory_server.db_path) as conn: cursor = conn.execute(""" SELECT tool_name, COUNT(*) as total, SUM(CASE WHEN result LIKE '%"success": true%' THEN 1 ELSE 0 END) as successes FROM tool_logs WHERE tool_name = 'test_agent' GROUP BY tool_name """) row = cursor.fetchone() if row: total, successes = row[1], row[2] success_rate = successes / total if total > 0 else 0 assert success_rate >= 0.7 # At least 70% success @pytest.mark.integration class TestIntelligentCoordination: """Test intelligent agent coordination""" async def test_task_decomposition(self, memory_server): """Test complex task decomposition""" complex_task = { "type": "implement_feature", "description": "Add authentication with tests and docs", "subtasks": ["research", "design", "implement", "test", "document"] } # System should break down into subtasks # For now, test that individual operations work await memory_server._add_episode( "Task Decomposition", "Complex task broken into subtasks", "coordination" ) async def test_parallel_execution_coordination(self, memory_server): """Test parallel task execution coordination""" import asyncio # Execute multiple operations in parallel tasks = [ memory_server._add_episode(f"Episode {i}", f"Content {i}", "parallel") for i in range(5) ] results = await asyncio.gather(*tasks) # All should succeed assert all(r["success"] for r in results) async def test_dependency_resolution(self, memory_server): """Test task dependency resolution""" # Task B depends on Task A output result_a = await memory_server._add_episode( "Task A", "First task", "dependency" ) # Use result_a in task B result_b = await memory_server._add_episode( "Task B", f"Second task, depends on {result_a['id']}", "dependency" ) assert result_a["success"] is True assert result_b["success"] is True async def test_error_recovery_coordination(self, memory_server): """Test coordinated error recovery""" # Simulate task failure result_fail = await memory_server._add_episode("", "", "test") # System should handle gracefully assert result_fail["success"] is False # Retry with valid data result_retry = await memory_server._add_episode( "Recovered Task", "After error recovery", "test" ) assert result_retry["success"] is True @pytest.mark.integration class TestAdaptiveIntelligence: """Test adaptive and learning intelligence""" async def test_learning_from_failures(self, memory_server): """Test system learns from failures""" # Log failures for i in range(3): await memory_server._log_tool_execution( tool_name="failing_task", args={"attempt": i}, result={"success": False, "error": "timeout"} ) # System should adapt (increase timeout, change strategy, etc.) # This would be in the intelligence module async def test_pattern_based_optimization(self, memory_server): """Test optimization based on learned patterns""" # Log successful pattern for i in range(5): await memory_server._log_tool_execution( tool_name="optimized_task", args={"method": "cached"}, result={"success": True, "duration": 0.1} ) # Log slow pattern await memory_server._log_tool_execution( tool_name="slow_task", args={"method": "uncached"}, result={"success": True, "duration": 2.0} ) # System should prefer fast pattern # Check that pattern learning captured this async def test_contextual_intelligence(self, memory_server): """Test context-aware intelligent decisions""" # Different contexts should trigger different behaviors await memory_server._log_tool_execution( tool_name="task", args={"context": "production", "validation": "strict"}, result={"success": True} ) await memory_server._log_tool_execution( tool_name="task", args={"context": "development", "validation": "relaxed"}, result={"success": True} ) # System should learn context-appropriate behaviors async def test_continuous_improvement(self, memory_server): """Test continuous improvement over time""" import sqlite3 # Initial performance initial_start = 0 with sqlite3.connect(memory_server.db_path) as conn: cursor = conn.execute("SELECT COUNT(*) FROM tool_logs") initial_start = cursor.fetchone()[0] # Execute tasks and learn for i in range(20): await memory_server._log_tool_execution( tool_name="improving_task", args={"iteration": i}, result={"success": True, "performance": 1.0 - (i * 0.01)} ) # Verify learning happened (tool logs increased) with sqlite3.connect(memory_server.db_path) as conn: cursor = conn.execute("SELECT COUNT(*) FROM tool_logs") final_count = cursor.fetchone()[0] assert final_count > initial_start

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/airmcp-com/mcp-standards'

If you have feedback or need assistance with the MCP directory API, please join our Discord server