Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_parallel_research_orchestrator.py27.9 kB
""" Comprehensive test suite for ParallelResearchOrchestrator. This test suite covers: - Parallel task execution with concurrency control - Task distribution and load balancing - Error handling and timeout management - Synthesis callback functionality - Performance improvements over sequential execution - Circuit breaker integration - Resource usage monitoring """ import asyncio import time from typing import Any import pytest from maverick_mcp.utils.parallel_research import ( ParallelResearchConfig, ParallelResearchOrchestrator, ResearchResult, ResearchTask, TaskDistributionEngine, ) class TestParallelResearchConfig: """Test ParallelResearchConfig configuration class.""" def test_default_configuration(self): """Test default configuration values.""" config = ParallelResearchConfig() assert config.max_concurrent_agents == 4 assert config.timeout_per_agent == 180 assert config.enable_fallbacks is False assert config.rate_limit_delay == 0.5 def test_custom_configuration(self): """Test custom configuration values.""" config = ParallelResearchConfig( max_concurrent_agents=8, timeout_per_agent=180, enable_fallbacks=False, rate_limit_delay=0.5, ) assert config.max_concurrent_agents == 8 assert config.timeout_per_agent == 180 assert config.enable_fallbacks is False assert config.rate_limit_delay == 0.5 class TestResearchTask: """Test ResearchTask data class.""" def test_research_task_creation(self): """Test basic research task creation.""" task = ResearchTask( task_id="test_123_fundamental", task_type="fundamental", target_topic="AAPL financial analysis", focus_areas=["earnings", "valuation", "growth"], priority=8, timeout=240, ) assert task.task_id == "test_123_fundamental" assert task.task_type == "fundamental" assert task.target_topic == "AAPL financial analysis" assert task.focus_areas == ["earnings", "valuation", "growth"] assert task.priority == 8 assert task.timeout == 240 assert task.status == "pending" assert task.result is None assert task.error is None def test_task_lifecycle_tracking(self): """Test task lifecycle status tracking.""" task = ResearchTask( task_id="lifecycle_test", task_type="sentiment", target_topic="TSLA sentiment analysis", focus_areas=["news", "social"], ) # Initial state assert task.status == "pending" assert task.start_time is None assert task.end_time is None # Simulate task execution task.start_time = time.time() task.status = "running" # Simulate completion time.sleep(0.01) # Small delay to ensure different timestamps task.end_time = time.time() task.status = "completed" task.result = {"insights": ["Test insight"]} assert task.status == "completed" assert task.start_time < task.end_time assert task.result is not None def test_task_error_handling(self): """Test task error state tracking.""" task = ResearchTask( task_id="error_test", task_type="technical", target_topic="NVDA technical analysis", focus_areas=["chart_patterns"], ) # Simulate error task.status = "failed" task.error = "API timeout occurred" task.end_time = time.time() assert task.status == "failed" assert task.error == "API timeout occurred" assert task.result is None class TestParallelResearchOrchestrator: """Test ParallelResearchOrchestrator main functionality.""" @pytest.fixture def config(self): """Create test configuration.""" return ParallelResearchConfig( max_concurrent_agents=3, timeout_per_agent=5, # Short timeout for tests enable_fallbacks=True, rate_limit_delay=0.1, # Fast rate limit for tests ) @pytest.fixture def orchestrator(self, config): """Create orchestrator with test configuration.""" return ParallelResearchOrchestrator(config) @pytest.fixture def sample_tasks(self): """Create sample research tasks for testing.""" return [ ResearchTask( task_id="test_123_fundamental", task_type="fundamental", target_topic="AAPL analysis", focus_areas=["earnings", "valuation"], priority=8, ), ResearchTask( task_id="test_123_technical", task_type="technical", target_topic="AAPL analysis", focus_areas=["chart_patterns", "indicators"], priority=6, ), ResearchTask( task_id="test_123_sentiment", task_type="sentiment", target_topic="AAPL analysis", focus_areas=["news", "analyst_ratings"], priority=7, ), ] def test_orchestrator_initialization(self, config): """Test orchestrator initialization.""" orchestrator = ParallelResearchOrchestrator(config) assert orchestrator.config == config assert orchestrator.active_tasks == {} assert orchestrator._semaphore._value == config.max_concurrent_agents assert orchestrator.orchestration_logger is not None def test_orchestrator_default_config(self): """Test orchestrator with default configuration.""" orchestrator = ParallelResearchOrchestrator() assert orchestrator.config.max_concurrent_agents == 4 assert orchestrator.config.timeout_per_agent == 180 @pytest.mark.asyncio async def test_successful_parallel_execution(self, orchestrator, sample_tasks): """Test successful parallel execution of research tasks.""" # Mock research executor that returns success async def mock_executor(task: ResearchTask) -> dict[str, Any]: await asyncio.sleep(0.1) # Simulate work return { "research_type": task.task_type, "insights": [f"Insight for {task.task_type}"], "sentiment": {"direction": "bullish", "confidence": 0.8}, "credibility_score": 0.9, } # Mock synthesis callback async def mock_synthesis( task_results: dict[str, ResearchTask], ) -> dict[str, Any]: return { "synthesis": "Combined analysis from parallel research", "confidence_score": 0.85, "key_findings": ["Finding 1", "Finding 2"], } # Execute parallel research start_time = time.time() result = await orchestrator.execute_parallel_research( tasks=sample_tasks, research_executor=mock_executor, synthesis_callback=mock_synthesis, ) execution_time = time.time() - start_time # Verify results assert isinstance(result, ResearchResult) assert result.successful_tasks == 3 assert result.failed_tasks == 0 assert result.synthesis is not None assert ( result.synthesis["synthesis"] == "Combined analysis from parallel research" ) assert len(result.task_results) == 3 # Verify parallel efficiency (should be faster than sequential) assert ( execution_time < 0.5 ) # Should complete much faster than 3 * 0.1s sequential assert result.parallel_efficiency > 0.0 # Should show some efficiency @pytest.mark.asyncio async def test_concurrency_control(self, orchestrator, config): """Test that concurrency is properly limited.""" execution_order = [] active_count = 0 max_concurrent = 0 async def mock_executor(task: ResearchTask) -> dict[str, Any]: nonlocal active_count, max_concurrent active_count += 1 max_concurrent = max(max_concurrent, active_count) execution_order.append(f"start_{task.task_id}") await asyncio.sleep(0.1) # Simulate work active_count -= 1 execution_order.append(f"end_{task.task_id}") return {"result": f"completed_{task.task_id}"} # Create more tasks than max concurrent agents tasks = [ ResearchTask(f"task_{i}", "fundamental", "topic", ["focus"], priority=i) for i in range( config.max_concurrent_agents + 2 ) # 5 tasks, max 3 concurrent ] result = await orchestrator.execute_parallel_research( tasks=tasks, research_executor=mock_executor, ) # Verify concurrency was limited assert max_concurrent <= config.max_concurrent_agents assert ( result.successful_tasks == config.max_concurrent_agents ) # Limited by config assert len(execution_order) > 0 @pytest.mark.asyncio async def test_task_timeout_handling(self, orchestrator): """Test handling of task timeouts.""" async def slow_executor(task: ResearchTask) -> dict[str, Any]: await asyncio.sleep(10) # Longer than timeout return {"result": "should_not_complete"} tasks = [ ResearchTask( "timeout_task", "fundamental", "slow topic", ["focus"], timeout=1, # Very short timeout ) ] result = await orchestrator.execute_parallel_research( tasks=tasks, research_executor=slow_executor, ) # Verify timeout was handled assert result.successful_tasks == 0 assert result.failed_tasks == 1 failed_task = result.task_results["timeout_task"] assert failed_task.status == "failed" assert "timeout" in failed_task.error.lower() @pytest.mark.asyncio async def test_task_error_handling(self, orchestrator, sample_tasks): """Test handling of task execution errors.""" async def error_executor(task: ResearchTask) -> dict[str, Any]: if task.task_type == "technical": raise ValueError(f"Error in {task.task_type} analysis") return {"result": f"success_{task.task_type}"} result = await orchestrator.execute_parallel_research( tasks=sample_tasks, research_executor=error_executor, ) # Verify mixed success/failure results assert result.successful_tasks == 2 # fundamental and sentiment should succeed assert result.failed_tasks == 1 # technical should fail # Check specific task status technical_task = next( task for task in result.task_results.values() if task.task_type == "technical" ) assert technical_task.status == "failed" assert "Error in technical analysis" in technical_task.error @pytest.mark.asyncio async def test_task_preparation_and_prioritization(self, orchestrator): """Test task preparation and priority-based ordering.""" tasks = [ ResearchTask("low_priority", "technical", "topic", ["focus"], priority=2), ResearchTask( "high_priority", "fundamental", "topic", ["focus"], priority=9 ), ResearchTask("med_priority", "sentiment", "topic", ["focus"], priority=5), ] async def track_executor(task: ResearchTask) -> dict[str, Any]: return {"task_id": task.task_id, "priority": task.priority} result = await orchestrator.execute_parallel_research( tasks=tasks, research_executor=track_executor, ) # Verify all tasks were prepared (limited by max_concurrent_agents = 3) assert len(result.task_results) == 3 # Verify tasks have default timeout set for task in result.task_results.values(): assert task.timeout == orchestrator.config.timeout_per_agent @pytest.mark.asyncio async def test_synthesis_callback_error_handling(self, orchestrator, sample_tasks): """Test synthesis callback error handling.""" async def success_executor(task: ResearchTask) -> dict[str, Any]: return {"result": f"success_{task.task_type}"} async def failing_synthesis( task_results: dict[str, ResearchTask], ) -> dict[str, Any]: raise RuntimeError("Synthesis failed!") result = await orchestrator.execute_parallel_research( tasks=sample_tasks, research_executor=success_executor, synthesis_callback=failing_synthesis, ) # Verify tasks succeeded but synthesis failed gracefully assert result.successful_tasks == 3 assert result.synthesis is not None assert "error" in result.synthesis assert "Synthesis failed" in result.synthesis["error"] @pytest.mark.asyncio async def test_no_synthesis_callback(self, orchestrator, sample_tasks): """Test execution without synthesis callback.""" async def success_executor(task: ResearchTask) -> dict[str, Any]: return {"result": f"success_{task.task_type}"} result = await orchestrator.execute_parallel_research( tasks=sample_tasks, research_executor=success_executor, # No synthesis callback provided ) assert result.successful_tasks == 3 assert result.synthesis is None # Should be None when no callback @pytest.mark.asyncio async def test_rate_limiting_between_tasks(self, orchestrator): """Test rate limiting delays between task starts.""" start_times = [] async def timing_executor(task: ResearchTask) -> dict[str, Any]: start_times.append(time.time()) await asyncio.sleep(0.05) return {"result": task.task_id} tasks = [ ResearchTask(f"task_{i}", "fundamental", "topic", ["focus"]) for i in range(3) ] await orchestrator.execute_parallel_research( tasks=tasks, research_executor=timing_executor, ) # Verify rate limiting created delays (approximately rate_limit_delay apart) assert len(start_times) == 3 # Note: Due to parallel execution, exact timing is hard to verify # but we can check that execution completed @pytest.mark.asyncio async def test_empty_task_list(self, orchestrator): """Test handling of empty task list.""" async def unused_executor(task: ResearchTask) -> dict[str, Any]: return {"result": "should_not_be_called"} result = await orchestrator.execute_parallel_research( tasks=[], research_executor=unused_executor, ) assert result.successful_tasks == 0 assert result.failed_tasks == 0 assert result.task_results == {} assert result.synthesis is None @pytest.mark.asyncio async def test_performance_metrics_calculation(self, orchestrator, sample_tasks): """Test calculation of performance metrics.""" task_durations = [] async def tracked_executor(task: ResearchTask) -> dict[str, Any]: start = time.time() await asyncio.sleep(0.05) # Simulate work duration = time.time() - start task_durations.append(duration) return {"result": task.task_id} result = await orchestrator.execute_parallel_research( tasks=sample_tasks, research_executor=tracked_executor, ) # Verify performance metrics assert result.total_execution_time > 0 assert result.parallel_efficiency > 0 # Parallel efficiency should be roughly: sum(individual_durations) / total_wall_time expected_sequential_time = sum(task_durations) efficiency_ratio = expected_sequential_time / result.total_execution_time # Allow some tolerance for timing variations assert abs(result.parallel_efficiency - efficiency_ratio) < 0.5 @pytest.mark.asyncio async def test_circuit_breaker_integration(self, orchestrator): """Test integration with circuit breaker pattern.""" failure_count = 0 async def circuit_breaker_executor(task: ResearchTask) -> dict[str, Any]: nonlocal failure_count failure_count += 1 if failure_count <= 2: # First 2 calls fail raise RuntimeError("Circuit breaker test failure") return {"result": "success_after_failures"} tasks = [ ResearchTask(f"cb_task_{i}", "fundamental", "topic", ["focus"]) for i in range(3) ] # Note: The actual circuit breaker is applied in _execute_single_task # This test verifies that errors are properly handled result = await orchestrator.execute_parallel_research( tasks=tasks, research_executor=circuit_breaker_executor, ) # Should have some failures and potentially some successes assert result.failed_tasks >= 2 # At least 2 should fail assert result.total_execution_time > 0 class TestTaskDistributionEngine: """Test TaskDistributionEngine functionality.""" def test_task_distribution_engine_creation(self): """Test creation of task distribution engine.""" engine = TaskDistributionEngine() assert hasattr(engine, "TASK_TYPES") assert "fundamental" in engine.TASK_TYPES assert "technical" in engine.TASK_TYPES assert "sentiment" in engine.TASK_TYPES assert "competitive" in engine.TASK_TYPES def test_topic_relevance_analysis(self): """Test analysis of topic relevance to different research types.""" engine = TaskDistributionEngine() # Test financial topic relevance = engine._analyze_topic_relevance( "AAPL earnings revenue profit analysis" ) assert ( relevance["fundamental"] > relevance["technical"] ) # Should favor fundamental assert all(0 <= score <= 1 for score in relevance.values()) # Valid range assert len(relevance) == 4 # All task types def test_distribute_research_tasks(self): """Test distribution of research topic into specialized tasks.""" engine = TaskDistributionEngine() tasks = engine.distribute_research_tasks( topic="Tesla financial performance and market sentiment", session_id="test_123", focus_areas=["earnings", "sentiment"], ) assert len(tasks) > 0 assert all(isinstance(task, ResearchTask) for task in tasks) assert all( task.session_id == "test_123" for task in [] ) # Tasks don't have session_id directly assert all( task.target_topic == "Tesla financial performance and market sentiment" for task in tasks ) # Verify task types are relevant task_types = {task.task_type for task in tasks} assert ( "fundamental" in task_types or "sentiment" in task_types ) # Should include relevant types def test_fallback_task_creation(self): """Test fallback task creation when no relevant tasks found.""" engine = TaskDistributionEngine() # Use a topic that truly has low relevance scores and will trigger fallback # First, let's mock the _analyze_topic_relevance to return low scores original_method = engine._analyze_topic_relevance def mock_low_relevance(topic, focus_areas=None): return { "fundamental": 0.1, "technical": 0.1, "sentiment": 0.1, "competitive": 0.1, } engine._analyze_topic_relevance = mock_low_relevance tasks = engine.distribute_research_tasks( topic="fallback test topic", session_id="fallback_test" ) # Restore original method engine._analyze_topic_relevance = original_method # Should create at least one fallback task assert len(tasks) >= 1 # Should have fundamental as fallback fallback_task = tasks[0] assert fallback_task.task_type == "fundamental" assert fallback_task.priority == 5 # Default priority def test_task_priority_assignment(self): """Test priority assignment based on relevance scores.""" engine = TaskDistributionEngine() tasks = engine.distribute_research_tasks( topic="Apple dividend yield earnings cash flow stability", # Should favor fundamental session_id="priority_test", ) # Find fundamental task (should have higher priority for this topic) fundamental_tasks = [task for task in tasks if task.task_type == "fundamental"] if fundamental_tasks: fundamental_task = fundamental_tasks[0] assert fundamental_task.priority >= 5 # Should have decent priority def test_focus_areas_integration(self): """Test integration of provided focus areas.""" engine = TaskDistributionEngine() tasks = engine.distribute_research_tasks( topic="Microsoft analysis", session_id="focus_test", focus_areas=["technical_analysis", "chart_patterns"], ) # Should include technical analysis tasks when focus areas suggest it {task.task_type for task in tasks} # Should favor technical analysis given the focus areas assert len(tasks) > 0 # Should create some tasks class TestResearchResult: """Test ResearchResult data structure.""" def test_research_result_initialization(self): """Test ResearchResult initialization.""" result = ResearchResult() assert result.task_results == {} assert result.synthesis is None assert result.total_execution_time == 0.0 assert result.successful_tasks == 0 assert result.failed_tasks == 0 assert result.parallel_efficiency == 0.0 def test_research_result_data_storage(self): """Test storing data in ResearchResult.""" result = ResearchResult() # Add sample task results task1 = ResearchTask("task_1", "fundamental", "topic", ["focus"]) task1.status = "completed" task2 = ResearchTask("task_2", "technical", "topic", ["focus"]) task2.status = "failed" result.task_results = {"task_1": task1, "task_2": task2} result.successful_tasks = 1 result.failed_tasks = 1 result.total_execution_time = 2.5 result.parallel_efficiency = 1.8 result.synthesis = {"findings": "Test findings"} assert len(result.task_results) == 2 assert result.successful_tasks == 1 assert result.failed_tasks == 1 assert result.total_execution_time == 2.5 assert result.parallel_efficiency == 1.8 assert result.synthesis["findings"] == "Test findings" @pytest.mark.integration class TestParallelResearchIntegration: """Integration tests for complete parallel research workflow.""" @pytest.fixture def full_orchestrator(self): """Create orchestrator with realistic configuration.""" config = ParallelResearchConfig( max_concurrent_agents=2, # Reduced for testing timeout_per_agent=10, enable_fallbacks=True, rate_limit_delay=0.1, ) return ParallelResearchOrchestrator(config) @pytest.mark.asyncio async def test_end_to_end_parallel_research(self, full_orchestrator): """Test complete end-to-end parallel research workflow.""" # Create realistic research tasks engine = TaskDistributionEngine() tasks = engine.distribute_research_tasks( topic="Apple Inc financial analysis and market outlook", session_id="integration_test", ) # Mock a realistic research executor async def realistic_executor(task: ResearchTask) -> dict[str, Any]: await asyncio.sleep(0.1) # Simulate API calls return { "research_type": task.task_type, "insights": [ f"{task.task_type} insight 1 for {task.target_topic}", f"{task.task_type} insight 2 based on {task.focus_areas[0] if task.focus_areas else 'general'}", ], "sentiment": { "direction": "bullish" if task.task_type != "technical" else "neutral", "confidence": 0.75, }, "risk_factors": [f"{task.task_type} risk factor"], "opportunities": [f"{task.task_type} opportunity"], "credibility_score": 0.8, "sources": [ { "title": f"Source for {task.task_type} research", "url": f"https://example.com/{task.task_type}", "credibility_score": 0.85, } ], } # Mock synthesis callback async def integration_synthesis( task_results: dict[str, ResearchTask], ) -> dict[str, Any]: successful_results = [ task.result for task in task_results.values() if task.status == "completed" and task.result ] all_insights = [] for result in successful_results: all_insights.extend(result.get("insights", [])) return { "synthesis": f"Integrated analysis from {len(successful_results)} research angles", "confidence_score": 0.82, "key_findings": all_insights[:5], # Top 5 insights "overall_sentiment": "bullish", "research_depth": "comprehensive", } # Execute the integration test start_time = time.time() result = await full_orchestrator.execute_parallel_research( tasks=tasks, research_executor=realistic_executor, synthesis_callback=integration_synthesis, ) execution_time = time.time() - start_time # Comprehensive verification assert isinstance(result, ResearchResult) assert result.successful_tasks > 0 assert result.total_execution_time > 0 assert execution_time < 5 # Should complete reasonably quickly # Verify synthesis was generated assert result.synthesis is not None assert "synthesis" in result.synthesis assert result.synthesis["confidence_score"] > 0 # Verify task results structure for task_id, task in result.task_results.items(): assert isinstance(task, ResearchTask) assert task.task_id == task_id if task.status == "completed": assert task.result is not None assert "insights" in task.result assert "sentiment" in task.result # Verify performance characteristics if result.successful_tasks > 1: assert result.parallel_efficiency > 1.0 # Should show parallel benefit

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server