Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_deep_research_parallel_execution.py39.7 kB
""" Comprehensive test suite for DeepResearchAgent parallel execution functionality. This test suite covers: - Parallel vs sequential execution modes - Subagent creation and orchestration - Task routing to specialized subagents - Parallel execution fallback mechanisms - Result synthesis from parallel tasks - Performance characteristics of parallel execution """ import asyncio import time from datetime import datetime from unittest.mock import AsyncMock, Mock, patch import pytest from langchain_core.language_models import BaseChatModel from langchain_core.messages import AIMessage from langgraph.checkpoint.memory import MemorySaver from pydantic import ConfigDict from maverick_mcp.agents.deep_research import ( BaseSubagent, CompetitiveResearchAgent, DeepResearchAgent, FundamentalResearchAgent, SentimentResearchAgent, TechnicalResearchAgent, ) from maverick_mcp.utils.parallel_research import ( ParallelResearchConfig, ResearchResult, ResearchTask, ) class MockLLM(BaseChatModel): """Mock LLM for testing.""" # Allow extra fields to be set on this Pydantic model model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow") def __init__(self, **kwargs): # Extract our custom fields before calling super() self._response_content = kwargs.pop("response_content", "Mock response") super().__init__(**kwargs) def _generate(self, messages, stop=None, **kwargs): # This method should not be called in async tests raise NotImplementedError("Use ainvoke for async tests") async def ainvoke(self, messages, config=None, **kwargs): """Mock async invocation.""" await asyncio.sleep(0.01) # Simulate processing time return AIMessage(content=self._response_content) @property def _llm_type(self) -> str: return "mock_llm" class TestDeepResearchAgentParallelExecution: """Test DeepResearchAgent parallel execution capabilities.""" @pytest.fixture def mock_llm(self): """Create mock LLM.""" return MockLLM( response_content='{"KEY_INSIGHTS": ["Test insight"], "SENTIMENT": {"direction": "bullish", "confidence": 0.8}, "CREDIBILITY": 0.9}' ) @pytest.fixture def parallel_config(self): """Create test parallel configuration.""" return ParallelResearchConfig( max_concurrent_agents=3, timeout_per_agent=5, enable_fallbacks=True, rate_limit_delay=0.1, ) @pytest.fixture def deep_research_agent(self, mock_llm, parallel_config): """Create DeepResearchAgent with parallel execution enabled.""" return DeepResearchAgent( llm=mock_llm, persona="moderate", checkpointer=MemorySaver(), enable_parallel_execution=True, parallel_config=parallel_config, ) @pytest.fixture def sequential_agent(self, mock_llm): """Create DeepResearchAgent with sequential execution.""" return DeepResearchAgent( llm=mock_llm, persona="moderate", checkpointer=MemorySaver(), enable_parallel_execution=False, ) def test_agent_initialization_parallel_enabled(self, deep_research_agent): """Test agent initialization with parallel execution enabled.""" assert deep_research_agent.enable_parallel_execution is True assert deep_research_agent.parallel_config is not None assert deep_research_agent.parallel_orchestrator is not None assert deep_research_agent.task_distributor is not None assert deep_research_agent.parallel_config.max_concurrent_agents == 3 def test_agent_initialization_sequential(self, sequential_agent): """Test agent initialization with sequential execution.""" assert sequential_agent.enable_parallel_execution is False # These components should still be initialized for potential future use assert sequential_agent.parallel_orchestrator is not None @pytest.mark.asyncio async def test_parallel_execution_mode_selection(self, deep_research_agent): """Test parallel execution mode selection.""" # Mock search providers to be available mock_provider = AsyncMock() deep_research_agent.search_providers = [mock_provider] with ( patch.object( deep_research_agent, "_execute_parallel_research", new_callable=AsyncMock, ) as mock_parallel, patch.object(deep_research_agent.graph, "ainvoke") as mock_sequential, patch.object( deep_research_agent, "_ensure_search_providers_loaded", return_value=None, ), ): mock_parallel.return_value = { "status": "success", "execution_mode": "parallel", "agent_type": "deep_research", } # Test with parallel execution enabled (default) result = await deep_research_agent.research_comprehensive( topic="AAPL analysis", session_id="test_123" ) # Should use parallel execution mock_parallel.assert_called_once() mock_sequential.assert_not_called() assert result["execution_mode"] == "parallel" @pytest.mark.asyncio async def test_sequential_execution_mode_selection(self, sequential_agent): """Test sequential execution mode selection.""" # Mock search providers to be available mock_provider = AsyncMock() sequential_agent.search_providers = [mock_provider] with ( patch.object( sequential_agent, "_execute_parallel_research" ) as mock_parallel, patch.object(sequential_agent.graph, "ainvoke") as mock_sequential, patch.object( sequential_agent, "_ensure_search_providers_loaded", return_value=None ), ): mock_sequential.return_value = { "status": "success", "persona": "moderate", "research_confidence": 0.8, } # Test with parallel execution disabled await sequential_agent.research_comprehensive( topic="AAPL analysis", session_id="test_123" ) # Should use sequential execution mock_parallel.assert_not_called() mock_sequential.assert_called_once() @pytest.mark.asyncio async def test_parallel_execution_override(self, deep_research_agent): """Test overriding parallel execution at runtime.""" # Mock search providers to be available mock_provider = AsyncMock() deep_research_agent.search_providers = [mock_provider] with ( patch.object( deep_research_agent, "_execute_parallel_research" ) as mock_parallel, patch.object(deep_research_agent.graph, "ainvoke") as mock_sequential, patch.object( deep_research_agent, "_ensure_search_providers_loaded", return_value=None, ), ): mock_sequential.return_value = {"status": "success", "persona": "moderate"} # Override parallel execution to false await deep_research_agent.research_comprehensive( topic="AAPL analysis", session_id="test_123", use_parallel_execution=False, ) # Should use sequential despite agent default mock_parallel.assert_not_called() mock_sequential.assert_called_once() @pytest.mark.asyncio async def test_parallel_execution_fallback(self, deep_research_agent): """Test fallback to sequential when parallel execution fails.""" # Mock search providers to be available mock_provider = AsyncMock() deep_research_agent.search_providers = [mock_provider] with ( patch.object( deep_research_agent, "_execute_parallel_research", new_callable=AsyncMock, ) as mock_parallel, patch.object(deep_research_agent.graph, "ainvoke") as mock_sequential, patch.object( deep_research_agent, "_ensure_search_providers_loaded", return_value=None, ), ): # Parallel execution fails mock_parallel.side_effect = RuntimeError("Parallel execution failed") mock_sequential.return_value = { "status": "success", "persona": "moderate", "research_confidence": 0.7, } result = await deep_research_agent.research_comprehensive( topic="AAPL analysis", session_id="test_123" ) # Should attempt parallel then fall back to sequential mock_parallel.assert_called_once() mock_sequential.assert_called_once() assert result["status"] == "success" @pytest.mark.asyncio async def test_execute_parallel_research_task_distribution( self, deep_research_agent ): """Test parallel research task distribution.""" with ( patch.object( deep_research_agent.task_distributor, "distribute_research_tasks" ) as mock_distribute, patch.object( deep_research_agent.parallel_orchestrator, "execute_parallel_research", new_callable=AsyncMock, ) as mock_execute, ): # Mock task distribution mock_tasks = [ ResearchTask( "test_123_fundamental", "fundamental", "AAPL", ["earnings"] ), ResearchTask("test_123_sentiment", "sentiment", "AAPL", ["news"]), ] mock_distribute.return_value = mock_tasks # Mock orchestrator execution mock_result = ResearchResult() mock_result.successful_tasks = 2 mock_result.failed_tasks = 0 mock_result.synthesis = {"confidence_score": 0.85} mock_execute.return_value = mock_result initial_state = { "persona": "moderate", "research_topic": "AAPL analysis", "session_id": "test_123", "focus_areas": ["earnings", "sentiment"], } await deep_research_agent._execute_parallel_research( topic="AAPL analysis", session_id="test_123", depth="standard", focus_areas=["earnings", "sentiment"], start_time=datetime.now(), initial_state=initial_state, ) # Verify task distribution was called correctly mock_distribute.assert_called_once_with( topic="AAPL analysis", session_id="test_123", focus_areas=["earnings", "sentiment"], ) # Verify orchestrator was called with distributed tasks mock_execute.assert_called_once() args, kwargs = mock_execute.call_args assert kwargs["tasks"] == mock_tasks @pytest.mark.asyncio async def test_subagent_task_routing(self, deep_research_agent): """Test routing tasks to appropriate subagents.""" # Test fundamental routing fundamental_task = ResearchTask( "test_fundamental", "fundamental", "AAPL", ["earnings"] ) with patch( "maverick_mcp.agents.deep_research.FundamentalResearchAgent" ) as mock_fundamental: mock_subagent = AsyncMock() mock_subagent.execute_research.return_value = { "research_type": "fundamental" } mock_fundamental.return_value = mock_subagent # This would normally be called by the orchestrator # We're testing the routing logic directly await deep_research_agent._execute_subagent_task(fundamental_task) mock_fundamental.assert_called_once_with(deep_research_agent) mock_subagent.execute_research.assert_called_once_with(fundamental_task) @pytest.mark.asyncio async def test_unknown_task_type_fallback(self, deep_research_agent): """Test fallback for unknown task types.""" unknown_task = ResearchTask("test_unknown", "unknown_type", "AAPL", ["test"]) with patch( "maverick_mcp.agents.deep_research.FundamentalResearchAgent" ) as mock_fundamental: mock_subagent = AsyncMock() mock_subagent.execute_research.return_value = { "research_type": "fundamental" } mock_fundamental.return_value = mock_subagent await deep_research_agent._execute_subagent_task(unknown_task) # Should fall back to fundamental analysis mock_fundamental.assert_called_once_with(deep_research_agent) @pytest.mark.asyncio async def test_parallel_result_synthesis(self, deep_research_agent, mock_llm): """Test synthesis of results from parallel tasks.""" # Create mock task results task_results = { "test_123_fundamental": ResearchTask( "test_123_fundamental", "fundamental", "AAPL", ["earnings"] ), "test_123_sentiment": ResearchTask( "test_123_sentiment", "sentiment", "AAPL", ["news"] ), } # Set tasks as completed with results task_results["test_123_fundamental"].status = "completed" task_results["test_123_fundamental"].result = { "insights": ["Strong earnings growth"], "sentiment": {"direction": "bullish", "confidence": 0.8}, "credibility_score": 0.9, } task_results["test_123_sentiment"].status = "completed" task_results["test_123_sentiment"].result = { "insights": ["Positive market sentiment"], "sentiment": {"direction": "bullish", "confidence": 0.7}, "credibility_score": 0.8, } # Mock LLM synthesis response mock_llm._response_content = "Synthesized analysis showing strong bullish outlook based on fundamental and sentiment analysis" result = await deep_research_agent._synthesize_parallel_results(task_results) assert result is not None assert "synthesis" in result assert "key_insights" in result assert "overall_sentiment" in result assert len(result["key_insights"]) > 0 assert result["overall_sentiment"]["direction"] == "bullish" @pytest.mark.asyncio async def test_synthesis_with_mixed_results(self, deep_research_agent): """Test synthesis with mixed successful and failed tasks.""" task_results = { "test_123_fundamental": ResearchTask( "test_123_fundamental", "fundamental", "AAPL", ["earnings"] ), "test_123_technical": ResearchTask( "test_123_technical", "technical", "AAPL", ["charts"] ), "test_123_sentiment": ResearchTask( "test_123_sentiment", "sentiment", "AAPL", ["news"] ), } # One successful, one failed, one successful task_results["test_123_fundamental"].status = "completed" task_results["test_123_fundamental"].result = { "insights": ["Strong fundamentals"], "sentiment": {"direction": "bullish", "confidence": 0.8}, } task_results["test_123_technical"].status = "failed" task_results["test_123_technical"].error = "Technical analysis failed" task_results["test_123_sentiment"].status = "completed" task_results["test_123_sentiment"].result = { "insights": ["Mixed sentiment"], "sentiment": {"direction": "neutral", "confidence": 0.6}, } result = await deep_research_agent._synthesize_parallel_results(task_results) # Should handle mixed results gracefully assert result is not None assert len(result["key_insights"]) > 0 assert "task_breakdown" in result assert result["task_breakdown"]["test_123_technical"]["status"] == "failed" @pytest.mark.asyncio async def test_synthesis_with_no_successful_results(self, deep_research_agent): """Test synthesis when all tasks fail.""" task_results = { "test_123_fundamental": ResearchTask( "test_123_fundamental", "fundamental", "AAPL", ["earnings"] ), "test_123_sentiment": ResearchTask( "test_123_sentiment", "sentiment", "AAPL", ["news"] ), } # Both tasks failed task_results["test_123_fundamental"].status = "failed" task_results["test_123_fundamental"].error = "API timeout" task_results["test_123_sentiment"].status = "failed" task_results["test_123_sentiment"].error = "No data available" result = await deep_research_agent._synthesize_parallel_results(task_results) # Should handle gracefully assert result is not None assert result["confidence_score"] == 0.0 assert "No research results available" in result["synthesis"] @pytest.mark.asyncio async def test_synthesis_llm_failure_fallback(self, deep_research_agent): """Test fallback when LLM synthesis fails.""" task_results = { "test_123_fundamental": ResearchTask( "test_123_fundamental", "fundamental", "AAPL", ["earnings"] ), } task_results["test_123_fundamental"].status = "completed" task_results["test_123_fundamental"].result = { "insights": ["Good insights"], "sentiment": {"direction": "bullish", "confidence": 0.8}, } # Mock LLM to fail with patch.object( deep_research_agent.llm, "ainvoke", side_effect=RuntimeError("LLM failed") ): result = await deep_research_agent._synthesize_parallel_results( task_results ) # Should use fallback synthesis assert result is not None assert "fallback synthesis" in result["synthesis"].lower() @pytest.mark.asyncio async def test_format_parallel_research_response(self, deep_research_agent): """Test formatting of parallel research response.""" # Create mock research result research_result = ResearchResult() research_result.successful_tasks = 2 research_result.failed_tasks = 0 research_result.total_execution_time = 1.5 research_result.parallel_efficiency = 2.1 research_result.synthesis = { "confidence_score": 0.85, "key_findings": ["Finding 1", "Finding 2"], } # Mock task results with sources task1 = ResearchTask( "test_123_fundamental", "fundamental", "AAPL", ["earnings"] ) task1.status = "completed" task1.result = { "sources": [ { "title": "AAPL Earnings Report", "url": "https://example.com/earnings", "credibility_score": 0.9, } ] } research_result.task_results = {"test_123_fundamental": task1} start_time = datetime.now() formatted_result = await deep_research_agent._format_parallel_research_response( research_result=research_result, topic="AAPL analysis", session_id="test_123", depth="standard", initial_state={"persona": "moderate"}, start_time=start_time, ) # Verify formatted response structure assert formatted_result["status"] == "success" assert formatted_result["agent_type"] == "deep_research" assert formatted_result["execution_mode"] == "parallel" assert formatted_result["research_topic"] == "AAPL analysis" assert formatted_result["confidence_score"] == 0.85 assert "parallel_execution_stats" in formatted_result assert formatted_result["parallel_execution_stats"]["successful_tasks"] == 2 assert len(formatted_result["citations"]) > 0 @pytest.mark.asyncio async def test_aggregated_sentiment_calculation(self, deep_research_agent): """Test aggregation of sentiment from multiple sources.""" sentiment_scores = [ {"direction": "bullish", "confidence": 0.8}, {"direction": "bullish", "confidence": 0.6}, {"direction": "neutral", "confidence": 0.7}, {"direction": "bearish", "confidence": 0.5}, ] result = deep_research_agent._calculate_aggregated_sentiment(sentiment_scores) assert result is not None assert "direction" in result assert "confidence" in result assert "consensus" in result assert "source_count" in result assert result["source_count"] == 4 @pytest.mark.asyncio async def test_parallel_recommendation_derivation(self, deep_research_agent): """Test derivation of investment recommendations from parallel analysis.""" # Test strong bullish signal bullish_sentiment = {"direction": "bullish", "confidence": 0.9} recommendation = deep_research_agent._derive_parallel_recommendation( bullish_sentiment ) assert "strong buy" in recommendation.lower() or "buy" in recommendation.lower() # Test bearish signal bearish_sentiment = {"direction": "bearish", "confidence": 0.8} recommendation = deep_research_agent._derive_parallel_recommendation( bearish_sentiment ) assert ( "caution" in recommendation.lower() or "negative" in recommendation.lower() ) # Test neutral/mixed signals neutral_sentiment = {"direction": "neutral", "confidence": 0.5} recommendation = deep_research_agent._derive_parallel_recommendation( neutral_sentiment ) assert "neutral" in recommendation.lower() or "mixed" in recommendation.lower() class TestSpecializedSubagents: """Test specialized research subagent functionality.""" @pytest.fixture def mock_parent_agent(self): """Create mock parent DeepResearchAgent.""" parent = Mock() parent.llm = MockLLM() parent.search_providers = [] parent.content_analyzer = Mock() parent.persona = Mock() parent.persona.name = "moderate" parent._calculate_source_credibility = Mock(return_value=0.8) return parent def test_base_subagent_initialization(self, mock_parent_agent): """Test BaseSubagent initialization.""" subagent = BaseSubagent(mock_parent_agent) assert subagent.parent == mock_parent_agent assert subagent.llm == mock_parent_agent.llm assert subagent.search_providers == mock_parent_agent.search_providers assert subagent.content_analyzer == mock_parent_agent.content_analyzer assert subagent.persona == mock_parent_agent.persona @pytest.mark.asyncio async def test_fundamental_research_agent(self, mock_parent_agent): """Test FundamentalResearchAgent execution.""" # Mock content analyzer mock_parent_agent.content_analyzer.analyze_content = AsyncMock( return_value={ "insights": ["Strong earnings growth"], "sentiment": {"direction": "bullish", "confidence": 0.8}, "risk_factors": ["Market volatility"], "opportunities": ["Dividend growth"], "credibility_score": 0.9, } ) subagent = FundamentalResearchAgent(mock_parent_agent) # Mock search results with patch.object(subagent, "_perform_specialized_search") as mock_search: mock_search.return_value = [ { "title": "AAPL Earnings Report", "url": "https://example.com/earnings", "content": "Apple reported strong quarterly earnings...", "credibility_score": 0.9, } ] task = ResearchTask( "fund_task", "fundamental", "AAPL analysis", ["earnings"] ) result = await subagent.execute_research(task) assert result["research_type"] == "fundamental" assert len(result["insights"]) > 0 assert "sentiment" in result assert result["sentiment"]["direction"] == "bullish" assert len(result["sources"]) > 0 def test_fundamental_query_generation(self, mock_parent_agent): """Test fundamental analysis query generation.""" subagent = FundamentalResearchAgent(mock_parent_agent) queries = subagent._generate_fundamental_queries("AAPL") assert len(queries) > 0 assert any("earnings" in query.lower() for query in queries) assert any("revenue" in query.lower() for query in queries) assert any("valuation" in query.lower() for query in queries) @pytest.mark.asyncio async def test_technical_research_agent(self, mock_parent_agent): """Test TechnicalResearchAgent execution.""" mock_parent_agent.content_analyzer.analyze_content = AsyncMock( return_value={ "insights": ["Bullish chart pattern"], "sentiment": {"direction": "bullish", "confidence": 0.7}, "risk_factors": ["Support level break"], "opportunities": ["Breakout potential"], "credibility_score": 0.8, } ) subagent = TechnicalResearchAgent(mock_parent_agent) with patch.object(subagent, "_perform_specialized_search") as mock_search: mock_search.return_value = [ { "title": "AAPL Technical Analysis", "url": "https://example.com/technical", "content": "Apple stock showing strong technical indicators...", "credibility_score": 0.8, } ] task = ResearchTask("tech_task", "technical", "AAPL analysis", ["charts"]) result = await subagent.execute_research(task) assert result["research_type"] == "technical" assert "price_action" in result["focus_areas"] assert "technical_indicators" in result["focus_areas"] def test_technical_query_generation(self, mock_parent_agent): """Test technical analysis query generation.""" subagent = TechnicalResearchAgent(mock_parent_agent) queries = subagent._generate_technical_queries("AAPL") assert any("technical analysis" in query.lower() for query in queries) assert any("chart pattern" in query.lower() for query in queries) assert any( "rsi" in query.lower() or "macd" in query.lower() for query in queries ) @pytest.mark.asyncio async def test_sentiment_research_agent(self, mock_parent_agent): """Test SentimentResearchAgent execution.""" mock_parent_agent.content_analyzer.analyze_content = AsyncMock( return_value={ "insights": ["Positive analyst sentiment"], "sentiment": {"direction": "bullish", "confidence": 0.9}, "risk_factors": ["Market sentiment shift"], "opportunities": ["Upgrade potential"], "credibility_score": 0.85, } ) subagent = SentimentResearchAgent(mock_parent_agent) with patch.object(subagent, "_perform_specialized_search") as mock_search: mock_search.return_value = [ { "title": "AAPL Analyst Upgrade", "url": "https://example.com/upgrade", "content": "Apple receives analyst upgrade...", "credibility_score": 0.85, } ] task = ResearchTask("sent_task", "sentiment", "AAPL analysis", ["news"]) result = await subagent.execute_research(task) assert result["research_type"] == "sentiment" assert "market_sentiment" in result["focus_areas"] assert result["sentiment"]["confidence"] > 0.8 @pytest.mark.asyncio async def test_competitive_research_agent(self, mock_parent_agent): """Test CompetitiveResearchAgent execution.""" mock_parent_agent.content_analyzer.analyze_content = AsyncMock( return_value={ "insights": ["Strong competitive position"], "sentiment": {"direction": "bullish", "confidence": 0.7}, "risk_factors": ["Increased competition"], "opportunities": ["Market expansion"], "credibility_score": 0.8, } ) subagent = CompetitiveResearchAgent(mock_parent_agent) with patch.object(subagent, "_perform_specialized_search") as mock_search: mock_search.return_value = [ { "title": "AAPL Market Share Analysis", "url": "https://example.com/marketshare", "content": "Apple maintains strong market position...", "credibility_score": 0.8, } ] task = ResearchTask( "comp_task", "competitive", "AAPL analysis", ["market_share"] ) result = await subagent.execute_research(task) assert result["research_type"] == "competitive" assert "competitive_position" in result["focus_areas"] assert "market_share" in result["focus_areas"] @pytest.mark.asyncio async def test_subagent_search_deduplication(self, mock_parent_agent): """Test search result deduplication in subagents.""" subagent = BaseSubagent(mock_parent_agent) # Mock search providers with duplicate results mock_provider1 = AsyncMock() mock_provider1.search.return_value = [ {"url": "https://example.com/article1", "title": "Article 1"}, {"url": "https://example.com/article2", "title": "Article 2"}, ] mock_provider2 = AsyncMock() mock_provider2.search.return_value = [ {"url": "https://example.com/article1", "title": "Article 1"}, # Duplicate {"url": "https://example.com/article3", "title": "Article 3"}, ] subagent.search_providers = [mock_provider1, mock_provider2] results = await subagent._perform_specialized_search( "test topic", ["test query"], max_results=10 ) # Should deduplicate by URL urls = [result["url"] for result in results] assert len(urls) == len(set(urls)) # No duplicates assert len(results) == 3 # Should have 3 unique results @pytest.mark.asyncio async def test_subagent_search_error_handling(self, mock_parent_agent): """Test error handling in subagent search.""" subagent = BaseSubagent(mock_parent_agent) # Mock provider that fails mock_provider = AsyncMock() mock_provider.search.side_effect = RuntimeError("Search failed") subagent.search_providers = [mock_provider] # Should handle errors gracefully and return empty results results = await subagent._perform_specialized_search( "test topic", ["test query"], max_results=10 ) assert results == [] # Should return empty list on error @pytest.mark.asyncio async def test_subagent_content_analysis_error_handling(self, mock_parent_agent): """Test content analysis error handling in subagents.""" # Mock content analyzer that fails mock_parent_agent.content_analyzer.analyze_content = AsyncMock( side_effect=RuntimeError("Analysis failed") ) subagent = BaseSubagent(mock_parent_agent) search_results = [ { "title": "Test Article", "url": "https://example.com/test", "content": "Test content", } ] # Should handle analysis errors gracefully results = await subagent._analyze_search_results( search_results, "test_analysis" ) # Should return empty results when analysis fails assert results == [] @pytest.mark.integration class TestDeepResearchParallelIntegration: """Integration tests for DeepResearchAgent parallel execution.""" @pytest.fixture def integration_agent(self): """Create agent for integration testing.""" llm = MockLLM( '{"KEY_INSIGHTS": ["Integration insight"], "SENTIMENT": {"direction": "bullish", "confidence": 0.8}}' ) config = ParallelResearchConfig( max_concurrent_agents=2, timeout_per_agent=5, enable_fallbacks=True, rate_limit_delay=0.05, ) return DeepResearchAgent( llm=llm, persona="moderate", enable_parallel_execution=True, parallel_config=config, ) @pytest.mark.asyncio async def test_end_to_end_parallel_research(self, integration_agent): """Test complete end-to-end parallel research workflow.""" # Mock the search providers and subagent execution with patch.object(integration_agent, "_execute_subagent_task") as mock_execute: mock_execute.return_value = { "research_type": "fundamental", "insights": ["Strong financial health", "Growing revenue"], "sentiment": {"direction": "bullish", "confidence": 0.8}, "risk_factors": ["Market volatility"], "opportunities": ["Expansion potential"], "credibility_score": 0.85, "sources": [ { "title": "Financial Report", "url": "https://example.com/report", "credibility_score": 0.9, } ], } start_time = time.time() result = await integration_agent.research_comprehensive( topic="Apple Inc comprehensive financial analysis", session_id="integration_test_123", depth="comprehensive", focus_areas=["fundamentals", "sentiment", "competitive"], ) execution_time = time.time() - start_time # Verify result structure assert result["status"] == "success" assert result["agent_type"] == "deep_research" assert result["execution_mode"] == "parallel" assert ( result["research_topic"] == "Apple Inc comprehensive financial analysis" ) assert result["confidence_score"] > 0 assert len(result["citations"]) > 0 assert "parallel_execution_stats" in result # Verify performance characteristics assert execution_time < 10 # Should complete reasonably quickly assert result["execution_time_ms"] > 0 # Verify parallel execution stats stats = result["parallel_execution_stats"] assert stats["total_tasks"] > 0 assert stats["successful_tasks"] >= 0 assert stats["parallel_efficiency"] > 0 @pytest.mark.asyncio async def test_parallel_vs_sequential_performance(self, integration_agent): """Test performance comparison between parallel and sequential execution.""" topic = "Microsoft Corp investment analysis" session_id = "perf_test_123" # Mock subagent execution with realistic delay async def mock_subagent_execution(task): await asyncio.sleep(0.1) # Simulate work return { "research_type": task.task_type, "insights": [f"Insight from {task.task_type}"], "sentiment": {"direction": "bullish", "confidence": 0.7}, "credibility_score": 0.8, "sources": [], } with patch.object( integration_agent, "_execute_subagent_task", side_effect=mock_subagent_execution, ): # Test parallel execution start_parallel = time.time() parallel_result = await integration_agent.research_comprehensive( topic=topic, session_id=session_id, use_parallel_execution=True ) time.time() - start_parallel # Test sequential execution start_sequential = time.time() sequential_result = await integration_agent.research_comprehensive( topic=topic, session_id=f"{session_id}_seq", use_parallel_execution=False, ) time.time() - start_sequential # Verify both succeeded assert parallel_result["status"] == "success" assert sequential_result["status"] == "success" # Parallel should generally be faster (though not guaranteed in all test environments) # At minimum, parallel efficiency should be calculated if "parallel_execution_stats" in parallel_result: assert ( parallel_result["parallel_execution_stats"]["parallel_efficiency"] > 0 ) @pytest.mark.asyncio async def test_research_quality_consistency(self, integration_agent): """Test that parallel and sequential execution produce consistent quality.""" topic = "Tesla Inc strategic analysis" # Mock consistent subagent responses mock_response = { "research_type": "fundamental", "insights": ["Consistent insight 1", "Consistent insight 2"], "sentiment": {"direction": "bullish", "confidence": 0.75}, "credibility_score": 0.8, "sources": [ { "title": "Source", "url": "https://example.com", "credibility_score": 0.8, } ], } with patch.object( integration_agent, "_execute_subagent_task", return_value=mock_response ): parallel_result = await integration_agent.research_comprehensive( topic=topic, session_id="quality_test_parallel", use_parallel_execution=True, ) sequential_result = await integration_agent.research_comprehensive( topic=topic, session_id="quality_test_sequential", use_parallel_execution=False, ) # Both should succeed with reasonable confidence assert parallel_result["status"] == "success" assert sequential_result["status"] == "success" assert parallel_result["confidence_score"] > 0.5 assert sequential_result["confidence_score"] > 0.5

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server