test_agent_intelligence.py•10.6 kB
"""Integration tests for agent intelligence and AI routing
Tests model selection, complexity assessment, and intelligent agent coordination.
Target: 85%+ coverage for AI features
"""
import pytest
@pytest.mark.integration
class TestModelRouting:
"""Test intelligent model selection and routing"""
async def test_task_complexity_assessment_simple(self, memory_server):
"""Test simple task routed to fast model"""
task = {
"type": "search",
"description": "Find episode about Python",
"complexity": "low"
}
# Route task (if implemented)
# This would call model router
# For now, test that simple searches work
result = await memory_server._search_episodes("Python", limit=10)
assert result.get("success", False) or isinstance(result, dict)
async def test_task_complexity_assessment_complex(self, memory_server):
"""Test complex task routed to advanced model"""
task = {
"type": "generate_ai_standards",
"description": "Generate comprehensive CLAUDE.md from all configs",
"complexity": "high"
}
# Complex tasks should use more powerful models
# Test standards generation (complex task)
# This is implicitly tested by the standards generation working
async def test_cost_optimization_enabled(self, memory_server, monkeypatch):
"""Test cost optimization when enabled"""
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
# With cost optimization, should prefer cheaper models
# This is a integration point that would be tested with real API
assert True # Placeholder for actual cost optimization test
async def test_fallback_model_on_error(self, memory_server):
"""Test fallback to alternative model on error"""
# Simulate primary model failure
# System should fallback gracefully
# This requires mocking API calls
assert True # Placeholder
async def test_routing_statistics_tracking(self, memory_server):
"""Test model routing statistics are tracked"""
# Execute various tasks
await memory_server._search_episodes("test", limit=5)
await memory_server._list_recent(limit=5)
# Stats should be tracked (if implemented)
# Check tool_logs for execution patterns
import sqlite3
with sqlite3.connect(memory_server.db_path) as conn:
count = conn.execute("SELECT COUNT(*) FROM tool_logs").fetchone()[0]
assert count >= 0
@pytest.mark.integration
class TestAgentPerformanceTracking:
"""Test agent performance tracking and optimization"""
async def test_execution_logging(self, memory_server):
"""Test agent executions are logged"""
result = await memory_server._log_tool_execution(
tool_name="agent_task",
args={"task": "analyze code"},
result={"success": True, "duration": 1.5}
)
assert result["success"] is True
async def test_performance_statistics(self, memory_server):
"""Test performance statistics calculation"""
# Log multiple executions
for i in range(10):
await memory_server._log_tool_execution(
tool_name=f"task_{i % 3}",
args={"index": i},
result={"success": i % 4 != 0} # 75% success rate
)
# Stats should be calculable from logs
import sqlite3
with sqlite3.connect(memory_server.db_path) as conn:
total = conn.execute("SELECT COUNT(*) FROM tool_logs").fetchone()[0]
assert total >= 10
async def test_agent_recommendations(self, memory_server):
"""Test agent recommendations based on performance"""
# Log various agent performances
await memory_server._log_tool_execution(
tool_name="researcher",
args={"task": "research"},
result={"success": True, "quality_score": 0.95}
)
await memory_server._log_tool_execution(
tool_name="coder",
args={"task": "code"},
result={"success": True, "quality_score": 0.85}
)
# System should recommend best agent for task type
# This would be part of agent intelligence module
async def test_success_rate_calculation(self, memory_server):
"""Test success rate tracking for agents"""
# Log 10 tasks, 8 successful
for i in range(10):
await memory_server._log_tool_execution(
tool_name="test_agent",
args={"task": i},
result={"success": i < 8}
)
# Calculate success rate
import sqlite3
with sqlite3.connect(memory_server.db_path) as conn:
cursor = conn.execute("""
SELECT tool_name,
COUNT(*) as total,
SUM(CASE WHEN result LIKE '%"success": true%' THEN 1 ELSE 0 END) as successes
FROM tool_logs
WHERE tool_name = 'test_agent'
GROUP BY tool_name
""")
row = cursor.fetchone()
if row:
total, successes = row[1], row[2]
success_rate = successes / total if total > 0 else 0
assert success_rate >= 0.7 # At least 70% success
@pytest.mark.integration
class TestIntelligentCoordination:
"""Test intelligent agent coordination"""
async def test_task_decomposition(self, memory_server):
"""Test complex task decomposition"""
complex_task = {
"type": "implement_feature",
"description": "Add authentication with tests and docs",
"subtasks": ["research", "design", "implement", "test", "document"]
}
# System should break down into subtasks
# For now, test that individual operations work
await memory_server._add_episode(
"Task Decomposition",
"Complex task broken into subtasks",
"coordination"
)
async def test_parallel_execution_coordination(self, memory_server):
"""Test parallel task execution coordination"""
import asyncio
# Execute multiple operations in parallel
tasks = [
memory_server._add_episode(f"Episode {i}", f"Content {i}", "parallel")
for i in range(5)
]
results = await asyncio.gather(*tasks)
# All should succeed
assert all(r["success"] for r in results)
async def test_dependency_resolution(self, memory_server):
"""Test task dependency resolution"""
# Task B depends on Task A output
result_a = await memory_server._add_episode(
"Task A",
"First task",
"dependency"
)
# Use result_a in task B
result_b = await memory_server._add_episode(
"Task B",
f"Second task, depends on {result_a['id']}",
"dependency"
)
assert result_a["success"] is True
assert result_b["success"] is True
async def test_error_recovery_coordination(self, memory_server):
"""Test coordinated error recovery"""
# Simulate task failure
result_fail = await memory_server._add_episode("", "", "test")
# System should handle gracefully
assert result_fail["success"] is False
# Retry with valid data
result_retry = await memory_server._add_episode(
"Recovered Task",
"After error recovery",
"test"
)
assert result_retry["success"] is True
@pytest.mark.integration
class TestAdaptiveIntelligence:
"""Test adaptive and learning intelligence"""
async def test_learning_from_failures(self, memory_server):
"""Test system learns from failures"""
# Log failures
for i in range(3):
await memory_server._log_tool_execution(
tool_name="failing_task",
args={"attempt": i},
result={"success": False, "error": "timeout"}
)
# System should adapt (increase timeout, change strategy, etc.)
# This would be in the intelligence module
async def test_pattern_based_optimization(self, memory_server):
"""Test optimization based on learned patterns"""
# Log successful pattern
for i in range(5):
await memory_server._log_tool_execution(
tool_name="optimized_task",
args={"method": "cached"},
result={"success": True, "duration": 0.1}
)
# Log slow pattern
await memory_server._log_tool_execution(
tool_name="slow_task",
args={"method": "uncached"},
result={"success": True, "duration": 2.0}
)
# System should prefer fast pattern
# Check that pattern learning captured this
async def test_contextual_intelligence(self, memory_server):
"""Test context-aware intelligent decisions"""
# Different contexts should trigger different behaviors
await memory_server._log_tool_execution(
tool_name="task",
args={"context": "production", "validation": "strict"},
result={"success": True}
)
await memory_server._log_tool_execution(
tool_name="task",
args={"context": "development", "validation": "relaxed"},
result={"success": True}
)
# System should learn context-appropriate behaviors
async def test_continuous_improvement(self, memory_server):
"""Test continuous improvement over time"""
import sqlite3
# Initial performance
initial_start = 0
with sqlite3.connect(memory_server.db_path) as conn:
cursor = conn.execute("SELECT COUNT(*) FROM tool_logs")
initial_start = cursor.fetchone()[0]
# Execute tasks and learn
for i in range(20):
await memory_server._log_tool_execution(
tool_name="improving_task",
args={"iteration": i},
result={"success": True, "performance": 1.0 - (i * 0.01)}
)
# Verify learning happened (tool logs increased)
with sqlite3.connect(memory_server.db_path) as conn:
cursor = conn.execute("SELECT COUNT(*) FROM tool_logs")
final_count = cursor.fetchone()[0]
assert final_count > initial_start