Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_exa_research_integration.py52.8 kB
""" Comprehensive test suite for ExaSearch integration with research agents. This test suite validates the complete research agent architecture with ExaSearch provider, including timeout handling, parallel execution, specialized subagents, and performance benchmarking across all research depths and focus areas. """ import asyncio import time from unittest.mock import AsyncMock, MagicMock, patch import pytest from exa_py import Exa from maverick_mcp.agents.deep_research import ( RESEARCH_DEPTH_LEVELS, CompetitiveResearchAgent, ContentAnalyzer, DeepResearchAgent, ExaSearchProvider, FundamentalResearchAgent, SentimentResearchAgent, TechnicalResearchAgent, ) from maverick_mcp.api.routers.research import ( ResearchRequest, comprehensive_research, get_research_agent, ) from maverick_mcp.exceptions import WebSearchError from maverick_mcp.utils.parallel_research import ( ParallelResearchConfig, ParallelResearchOrchestrator, ResearchResult, ResearchTask, TaskDistributionEngine, ) # Test Data Factories and Fixtures @pytest.fixture def mock_llm(): """Mock LLM with realistic response patterns for research scenarios.""" llm = MagicMock() llm.ainvoke = AsyncMock() # Mock different response types for different research phases def mock_response_content(messages): """Generate realistic mock responses based on message content.""" content = str(messages[-1].content).lower() if "synthesis" in content: return MagicMock( content='{"synthesis": "Comprehensive analysis shows positive outlook", "confidence": 0.8}' ) elif "analyze" in content or "financial" in content: return MagicMock( content='{"KEY_INSIGHTS": ["Strong earnings growth", "Market share expansion"], "SENTIMENT": {"direction": "bullish", "confidence": 0.75}, "RISK_FACTORS": ["Interest rate sensitivity"], "OPPORTUNITIES": ["Market expansion"], "CREDIBILITY": 0.8, "RELEVANCE": 0.9, "SUMMARY": "Positive financial outlook"}' ) else: return MagicMock(content="Analysis completed successfully") llm.ainvoke.side_effect = lambda messages, **kwargs: mock_response_content(messages) return llm @pytest.fixture def mock_exa_client(): """Mock Exa client with realistic search responses.""" mock_client = MagicMock(spec=Exa) def create_mock_result(title, text, url_suffix=""): """Create mock Exa result object.""" result = MagicMock() result.url = f"https://example.com/{url_suffix}" result.title = title result.text = text result.published_date = "2024-01-15T10:00:00Z" result.score = 0.85 result.author = "Financial Analyst" return result def mock_search_and_contents(query, num_results=5, **kwargs): """Generate mock search results based on query content.""" response = MagicMock() results = [] query_lower = query.lower() if "aapl" in query_lower or "apple" in query_lower: results.extend( [ create_mock_result( "Apple Q4 Earnings Beat Expectations", "Apple reported strong quarterly earnings with iPhone sales growth of 15% and services revenue reaching new highs. The company's financial position remains robust with strong cash flow.", "apple-earnings", ), create_mock_result( "Apple Stock Technical Analysis", "Apple stock shows bullish technical patterns with support at $180 and resistance at $200. RSI indicates oversold conditions presenting buying opportunity.", "apple-technical", ), ] ) elif "sentiment" in query_lower: results.extend( [ create_mock_result( "Market Sentiment Turns Positive", "Investor sentiment shows improvement with increased confidence in tech sector. Analyst upgrades and positive earnings surprises drive optimism.", "market-sentiment", ), ] ) elif "competitive" in query_lower or "industry" in query_lower: results.extend( [ create_mock_result( "Tech Industry Competitive Landscape", "The technology sector shows fierce competition with market leaders maintaining strong positions. Innovation and market share battles intensify.", "competitive-analysis", ), ] ) else: # Default financial research results results.extend( [ create_mock_result( "Financial Market Analysis", "Current market conditions show mixed signals with growth prospects balanced against economic uncertainties. Investors remain cautiously optimistic.", "market-analysis", ), create_mock_result( "Investment Outlook 2024", "Investment opportunities emerge in technology and healthcare sectors despite ongoing market volatility. Diversification remains key strategy.", "investment-outlook", ), ] ) # Limit results to requested number response.results = results[:num_results] return response mock_client.search_and_contents.side_effect = mock_search_and_contents return mock_client @pytest.fixture def sample_research_tasks(): """Sample research tasks for parallel execution testing.""" return [ ResearchTask( task_id="session_123_fundamental", task_type="fundamental", target_topic="AAPL financial analysis", focus_areas=["earnings", "valuation", "growth"], priority=8, timeout=20, ), ResearchTask( task_id="session_123_technical", task_type="technical", target_topic="AAPL technical analysis", focus_areas=["chart_patterns", "support_resistance"], priority=7, timeout=15, ), ResearchTask( task_id="session_123_sentiment", task_type="sentiment", target_topic="AAPL market sentiment", focus_areas=["news_sentiment", "analyst_ratings"], priority=6, timeout=15, ), ] @pytest.fixture def mock_settings(): """Mock settings with ExaSearch configuration.""" settings = MagicMock() settings.research.exa_api_key = "test_exa_api_key" settings.data_limits.max_parallel_agents = 4 settings.performance.search_timeout_failure_threshold = 12 settings.performance.search_circuit_breaker_failure_threshold = 8 settings.performance.search_circuit_breaker_recovery_timeout = 30 return settings # ExaSearchProvider Tests class TestExaSearchProvider: """Test ExaSearch provider integration and functionality.""" @pytest.mark.unit def test_exa_provider_initialization(self): """Test ExaSearchProvider initialization.""" api_key = "test_api_key_123" provider = ExaSearchProvider(api_key) assert provider.api_key == api_key assert provider._api_key_verified is True assert provider.is_healthy() is True assert provider._failure_count == 0 @pytest.mark.unit def test_exa_provider_initialization_without_key(self): """Test ExaSearchProvider initialization without API key.""" provider = ExaSearchProvider("") assert provider.api_key == "" assert provider._api_key_verified is False assert provider.is_healthy() is True # Still healthy, but searches will fail @pytest.mark.unit def test_timeout_calculation(self): """Test adaptive timeout calculation for different query complexities.""" provider = ExaSearchProvider("test_key") # Simple query timeout = provider._calculate_timeout("AAPL", None) assert timeout >= 4.0 # Minimum for Exa reliability # Complex query complex_query = "comprehensive analysis of Apple Inc financial performance and market position with competitive analysis" timeout_complex = provider._calculate_timeout(complex_query, None) assert timeout_complex >= timeout # Budget constrained query timeout_budget = provider._calculate_timeout("AAPL", 8.0) assert 4.0 <= timeout_budget <= 8.0 @pytest.mark.unit def test_failure_recording_and_health_status(self): """Test failure recording and health status management.""" provider = ExaSearchProvider("test_key") # Initially healthy assert provider.is_healthy() is True # Record several timeout failures for _ in range(5): provider._record_failure("timeout") assert provider._failure_count == 5 assert provider.is_healthy() is True # Still healthy, threshold not reached # Exceed timeout threshold (default 12) for _ in range(8): provider._record_failure("timeout") assert provider._failure_count == 13 assert provider.is_healthy() is False # Now unhealthy # Test recovery provider._record_success() assert provider.is_healthy() is True assert provider._failure_count == 0 @pytest.mark.unit @patch("exa_py.Exa") async def test_exa_search_success(self, mock_exa_class, mock_exa_client): """Test successful ExaSearch operation.""" mock_exa_class.return_value = mock_exa_client provider = ExaSearchProvider("test_key") results = await provider.search("AAPL financial analysis", num_results=3) assert len(results) >= 1 assert all("url" in result for result in results) assert all("title" in result for result in results) assert all("content" in result for result in results) assert all(result["provider"] == "exa" for result in results) @pytest.mark.unit @patch("exa_py.Exa") async def test_exa_search_timeout(self, mock_exa_class): """Test ExaSearch timeout handling.""" # Mock Exa client that takes too long mock_client = MagicMock() def slow_search(*args, **kwargs): import time time.sleep(10) # Simulate slow synchronous response mock_client.search_and_contents.side_effect = slow_search mock_exa_class.return_value = mock_client provider = ExaSearchProvider("test_key") with pytest.raises(WebSearchError, match="timed out"): await provider.search("test query", timeout_budget=2.0) # Check that failure was recorded assert not provider.is_healthy() or provider._failure_count > 0 @pytest.mark.unit @patch("exa_py.Exa") async def test_exa_search_unhealthy_provider(self, mock_exa_class): """Test behavior when provider is marked as unhealthy.""" provider = ExaSearchProvider("test_key") provider._is_healthy = False with pytest.raises(WebSearchError, match="disabled due to repeated failures"): await provider.search("test query") # DeepResearchAgent Tests class TestDeepResearchAgent: """Test DeepResearchAgent with ExaSearch integration.""" @pytest.mark.unit @patch("maverick_mcp.agents.deep_research.get_cached_search_provider") async def test_agent_initialization_with_exa(self, mock_provider, mock_llm): """Test DeepResearchAgent initialization with ExaSearch provider.""" mock_exa_provider = MagicMock(spec=ExaSearchProvider) mock_provider.return_value = mock_exa_provider agent = DeepResearchAgent( llm=mock_llm, persona="moderate", exa_api_key="test_key", research_depth="standard", ) await agent.initialize() assert agent.search_providers == [mock_exa_provider] assert agent._search_providers_loaded is True assert agent.default_depth == "standard" @pytest.mark.unit @patch("maverick_mcp.agents.deep_research.get_cached_search_provider") async def test_agent_initialization_without_providers( self, mock_provider, mock_llm ): """Test agent behavior when no search providers are available.""" mock_provider.return_value = None agent = DeepResearchAgent( llm=mock_llm, persona="moderate", exa_api_key=None, ) await agent.initialize() assert agent.search_providers == [] assert agent._search_providers_loaded is True @pytest.mark.unit @patch("maverick_mcp.agents.deep_research.get_cached_search_provider") async def test_research_comprehensive_no_providers(self, mock_provider, mock_llm): """Test research behavior when no search providers are configured.""" mock_provider.return_value = None agent = DeepResearchAgent(llm=mock_llm, exa_api_key=None) result = await agent.research_comprehensive( topic="AAPL analysis", session_id="test_session", depth="basic" ) assert "error" in result assert "no search providers configured" in result["error"] assert result["topic"] == "AAPL analysis" @pytest.mark.integration @patch("maverick_mcp.agents.deep_research.get_cached_search_provider") @patch("exa_py.Exa") async def test_research_comprehensive_success( self, mock_exa_class, mock_provider, mock_llm, mock_exa_client ): """Test successful comprehensive research with ExaSearch.""" # Setup mocks mock_exa_provider = ExaSearchProvider("test_key") mock_provider.return_value = mock_exa_provider mock_exa_class.return_value = mock_exa_client agent = DeepResearchAgent( llm=mock_llm, persona="moderate", exa_api_key="test_key", research_depth="basic", ) # Execute research result = await agent.research_comprehensive( topic="AAPL financial analysis", session_id="test_session_123", depth="basic", timeout_budget=15.0, ) # Verify result structure assert result["status"] == "success" assert result["agent_type"] == "deep_research" assert result["research_topic"] == "AAPL financial analysis" assert result["research_depth"] == "basic" assert "findings" in result assert "confidence_score" in result assert "execution_time_ms" in result @pytest.mark.unit def test_research_depth_levels(self): """Test research depth level configurations.""" assert "basic" in RESEARCH_DEPTH_LEVELS assert "standard" in RESEARCH_DEPTH_LEVELS assert "comprehensive" in RESEARCH_DEPTH_LEVELS assert "exhaustive" in RESEARCH_DEPTH_LEVELS # Verify basic level has minimal settings for speed basic = RESEARCH_DEPTH_LEVELS["basic"] assert basic["max_sources"] <= 5 assert basic["max_searches"] <= 2 assert basic["validation_required"] is False # Verify exhaustive has maximum settings exhaustive = RESEARCH_DEPTH_LEVELS["exhaustive"] assert exhaustive["max_sources"] >= 10 assert exhaustive["validation_required"] is True # Specialized Subagent Tests class TestSpecializedSubagents: """Test specialized research subagents.""" @pytest.fixture def mock_parent_agent(self, mock_llm): """Mock parent DeepResearchAgent for subagent testing.""" agent = MagicMock() agent.llm = mock_llm agent.search_providers = [MagicMock(spec=ExaSearchProvider)] agent.content_analyzer = MagicMock(spec=ContentAnalyzer) agent.persona = MagicMock() agent.persona.name = "moderate" agent._calculate_source_credibility = MagicMock(return_value=0.8) return agent @pytest.mark.unit async def test_fundamental_research_agent( self, mock_parent_agent, sample_research_tasks ): """Test FundamentalResearchAgent execution.""" task = sample_research_tasks[0] # fundamental task agent = FundamentalResearchAgent(mock_parent_agent) # Mock search results mock_search_results = [ { "title": "AAPL Q4 Earnings Report", "url": "https://example.com/earnings", "content": "Apple reported strong quarterly earnings with revenue growth of 12%...", "published_date": "2024-01-15", } ] agent._perform_specialized_search = AsyncMock(return_value=mock_search_results) agent._analyze_search_results = AsyncMock( return_value=[ { **mock_search_results[0], "analysis": { "insights": [ "Strong earnings growth", "Revenue diversification", ], "risk_factors": ["Market competition"], "opportunities": ["Market expansion"], "sentiment": {"direction": "bullish", "confidence": 0.8}, }, "credibility_score": 0.8, } ] ) result = await agent.execute_research(task) assert result["research_type"] == "fundamental" assert "insights" in result assert "risk_factors" in result assert "opportunities" in result assert "sentiment" in result assert "sources" in result assert len(result["focus_areas"]) > 0 assert "earnings" in result["focus_areas"] @pytest.mark.unit async def test_technical_research_agent( self, mock_parent_agent, sample_research_tasks ): """Test TechnicalResearchAgent execution.""" task = sample_research_tasks[1] # technical task agent = TechnicalResearchAgent(mock_parent_agent) # Mock search results with technical analysis mock_search_results = [ { "title": "AAPL Technical Analysis", "url": "https://example.com/technical", "content": "AAPL shows bullish chart patterns with support at $180 and resistance at $200...", "published_date": "2024-01-15", } ] agent._perform_specialized_search = AsyncMock(return_value=mock_search_results) agent._analyze_search_results = AsyncMock( return_value=[ { **mock_search_results[0], "analysis": { "insights": [ "Bullish breakout pattern", "Strong support levels", ], "risk_factors": ["Overbought conditions"], "opportunities": ["Momentum continuation"], "sentiment": {"direction": "bullish", "confidence": 0.7}, }, "credibility_score": 0.7, } ] ) result = await agent.execute_research(task) assert result["research_type"] == "technical" assert "price_action" in result["focus_areas"] assert "chart_patterns" in result["focus_areas"] @pytest.mark.unit async def test_sentiment_research_agent( self, mock_parent_agent, sample_research_tasks ): """Test SentimentResearchAgent execution.""" task = sample_research_tasks[2] # sentiment task agent = SentimentResearchAgent(mock_parent_agent) # Mock search results with sentiment data mock_search_results = [ { "title": "Apple Stock Sentiment Analysis", "url": "https://example.com/sentiment", "content": "Analyst sentiment remains positive on Apple with multiple upgrades...", "published_date": "2024-01-15", } ] agent._perform_specialized_search = AsyncMock(return_value=mock_search_results) agent._analyze_search_results = AsyncMock( return_value=[ { **mock_search_results[0], "analysis": { "insights": ["Positive analyst sentiment", "Upgrade momentum"], "risk_factors": ["Market volatility concerns"], "opportunities": ["Institutional accumulation"], "sentiment": {"direction": "bullish", "confidence": 0.85}, }, "credibility_score": 0.9, } ] ) result = await agent.execute_research(task) assert result["research_type"] == "sentiment" assert "market_sentiment" in result["focus_areas"] assert result["sentiment"]["direction"] == "bullish" @pytest.mark.unit async def test_competitive_research_agent(self, mock_parent_agent): """Test CompetitiveResearchAgent execution.""" task = ResearchTask( task_id="test_competitive", task_type="competitive", target_topic="AAPL competitive analysis", focus_areas=["competitive_position", "market_share"], ) agent = CompetitiveResearchAgent(mock_parent_agent) # Mock search results with competitive data mock_search_results = [ { "title": "Apple vs Samsung Market Share", "url": "https://example.com/competitive", "content": "Apple maintains strong competitive position in premium smartphone market...", "published_date": "2024-01-15", } ] agent._perform_specialized_search = AsyncMock(return_value=mock_search_results) agent._analyze_search_results = AsyncMock( return_value=[ { **mock_search_results[0], "analysis": { "insights": [ "Strong market position", "Premium segment dominance", ], "risk_factors": ["Android competition"], "opportunities": ["Emerging markets"], "sentiment": {"direction": "bullish", "confidence": 0.75}, }, "credibility_score": 0.8, } ] ) result = await agent.execute_research(task) assert result["research_type"] == "competitive" assert "competitive_position" in result["focus_areas"] assert "industry_trends" in result["focus_areas"] # Parallel Research Tests class TestParallelResearchOrchestrator: """Test parallel research execution and orchestration.""" @pytest.mark.unit def test_orchestrator_initialization(self): """Test ParallelResearchOrchestrator initialization.""" config = ParallelResearchConfig(max_concurrent_agents=6, timeout_per_agent=20) orchestrator = ParallelResearchOrchestrator(config) assert orchestrator.config.max_concurrent_agents == 6 assert orchestrator.config.timeout_per_agent == 20 assert orchestrator._semaphore._value == 6 # Semaphore initialized correctly @pytest.mark.unit async def test_task_preparation(self, sample_research_tasks): """Test task preparation and prioritization.""" orchestrator = ParallelResearchOrchestrator() prepared_tasks = await orchestrator._prepare_tasks(sample_research_tasks) # Should be sorted by priority (descending) assert prepared_tasks[0].priority >= prepared_tasks[1].priority # All tasks should have timeouts set for task in prepared_tasks: assert task.timeout is not None assert task.status == "pending" assert task.task_id in orchestrator.active_tasks @pytest.mark.integration async def test_parallel_execution_success(self, sample_research_tasks): """Test successful parallel execution of research tasks.""" orchestrator = ParallelResearchOrchestrator( ParallelResearchConfig(max_concurrent_agents=3, timeout_per_agent=10) ) # Mock research executor async def mock_executor(task): """Mock research executor that simulates successful execution.""" await asyncio.sleep(0.1) # Simulate work return { "research_type": task.task_type, "insights": [ f"{task.task_type} insight 1", f"{task.task_type} insight 2", ], "sentiment": {"direction": "bullish", "confidence": 0.8}, "sources": [ {"title": f"{task.task_type} source", "url": "https://example.com"} ], } # Mock synthesis callback async def mock_synthesis(task_results): return { "synthesis": f"Synthesized results from {len(task_results)} tasks", "confidence_score": 0.8, } result = await orchestrator.execute_parallel_research( tasks=sample_research_tasks, research_executor=mock_executor, synthesis_callback=mock_synthesis, ) assert isinstance(result, ResearchResult) assert result.successful_tasks == len(sample_research_tasks) assert result.failed_tasks == 0 assert result.parallel_efficiency > 1.0 # Should be faster than sequential assert result.synthesis is not None assert "synthesis" in result.synthesis @pytest.mark.unit async def test_parallel_execution_with_failures(self, sample_research_tasks): """Test parallel execution with some task failures.""" orchestrator = ParallelResearchOrchestrator() # Mock research executor that fails for certain task types async def mock_executor_with_failures(task): if task.task_type == "technical": raise TimeoutError("Task timed out") elif task.task_type == "sentiment": raise Exception("Network error") else: return {"research_type": task.task_type, "insights": ["Success"]} result = await orchestrator.execute_parallel_research( tasks=sample_research_tasks, research_executor=mock_executor_with_failures, ) assert result.successful_tasks == 1 # Only fundamental should succeed assert result.failed_tasks == 2 # Check that failed tasks have error information failed_tasks = [ task for task in result.task_results.values() if task.status == "failed" ] assert len(failed_tasks) == 2 for task in failed_tasks: assert task.error is not None @pytest.mark.unit async def test_circuit_breaker_integration(self, sample_research_tasks): """Test circuit breaker integration in parallel execution.""" orchestrator = ParallelResearchOrchestrator() # Mock executor that consistently fails failure_count = 0 async def failing_executor(task): nonlocal failure_count failure_count += 1 raise Exception(f"Failure {failure_count}") result = await orchestrator.execute_parallel_research( tasks=sample_research_tasks, research_executor=failing_executor, ) # All tasks should fail assert result.failed_tasks == len(sample_research_tasks) assert result.successful_tasks == 0 class TestTaskDistributionEngine: """Test intelligent task distribution for research topics.""" @pytest.mark.unit def test_topic_relevance_analysis(self): """Test topic relevance analysis for different task types.""" engine = TaskDistributionEngine() # Test financial topic relevance = engine._analyze_topic_relevance( "apple earnings financial performance", focus_areas=["fundamentals", "financials"], ) assert "fundamental" in relevance assert "technical" in relevance assert "sentiment" in relevance assert "competitive" in relevance # Fundamental should have highest relevance for earnings query assert relevance["fundamental"] > relevance["technical"] assert relevance["fundamental"] > relevance["competitive"] @pytest.mark.unit def test_task_distribution_basic(self): """Test basic task distribution for a research topic.""" engine = TaskDistributionEngine() tasks = engine.distribute_research_tasks( topic="AAPL financial analysis and market outlook", session_id="test_session", focus_areas=["fundamentals", "technical_analysis"], ) assert len(tasks) > 0 # Should have variety of task types task_types = {task.task_type for task in tasks} assert "fundamental" in task_types # High relevance for financial analysis # Tasks should be properly configured for task in tasks: assert task.session_id == "test_session" assert task.target_topic == "AAPL financial analysis and market outlook" assert task.priority > 0 assert len(task.focus_areas) > 0 @pytest.mark.unit def test_task_distribution_fallback(self): """Test task distribution fallback when no relevant tasks found.""" engine = TaskDistributionEngine() # Mock the relevance analysis to return very low scores with patch.object( engine, "_analyze_topic_relevance", return_value={ "fundamental": 0.1, "technical": 0.1, "sentiment": 0.1, "competitive": 0.1, }, ): tasks = engine.distribute_research_tasks( topic="obscure topic with no clear relevance", session_id="test_session", ) # Should still create at least one task (fallback) assert len(tasks) >= 1 # Fallback should be fundamental analysis assert any(task.task_type == "fundamental" for task in tasks) @pytest.mark.unit def test_task_priority_assignment(self): """Test priority assignment based on relevance scores.""" engine = TaskDistributionEngine() tasks = engine.distribute_research_tasks( topic="AAPL fundamental analysis earnings valuation", session_id="test_session", ) # Find fundamental task (should have high priority) fundamental_tasks = [t for t in tasks if t.task_type == "fundamental"] if fundamental_tasks: fundamental_task = fundamental_tasks[0] assert fundamental_task.priority >= 7 # Should be high priority # Timeout and Circuit Breaker Tests class TestTimeoutAndCircuitBreaker: """Test timeout handling and circuit breaker patterns.""" @pytest.mark.unit async def test_timeout_budget_allocation(self, mock_llm): """Test timeout budget allocation across research phases.""" agent = DeepResearchAgent(llm=mock_llm, exa_api_key="test_key") # Test basic timeout allocation timeout_budget = 20.0 result = await agent.research_comprehensive( topic="test topic", session_id="test_session", depth="basic", timeout_budget=timeout_budget, ) # Should either complete or timeout gracefully assert "status" in result or "error" in result # If timeout occurred, should have appropriate error structure if result.get("status") == "error" or "error" in result: # Should be a timeout-related error for very short budget assert ( "timeout" in str(result).lower() or "search providers" in str(result).lower() ) @pytest.mark.unit def test_provider_health_monitoring(self): """Test search provider health monitoring and recovery.""" provider = ExaSearchProvider("test_key") # Initially healthy assert provider.is_healthy() # Simulate multiple timeout failures for _i in range(15): # Exceed default threshold of 12 provider._record_failure("timeout") # Should be marked unhealthy assert not provider.is_healthy() # Recovery after success provider._record_success() assert provider.is_healthy() assert provider._failure_count == 0 @pytest.mark.integration @patch("maverick_mcp.agents.deep_research.get_cached_search_provider") async def test_research_with_provider_failures(self, mock_provider, mock_llm): """Test research behavior when provider failures occur.""" # Create a provider that will fail failing_provider = MagicMock(spec=ExaSearchProvider) failing_provider.is_healthy.return_value = True failing_provider.search = AsyncMock(side_effect=WebSearchError("Search failed")) mock_provider.return_value = failing_provider agent = DeepResearchAgent(llm=mock_llm, exa_api_key="test_key") result = await agent.research_comprehensive( topic="test topic", session_id="test_session", depth="basic", ) # Should handle provider failure gracefully assert "status" in result # May succeed with fallback or fail gracefully # Performance and Benchmarking Tests class TestPerformanceBenchmarks: """Test performance across different research depths and configurations.""" @pytest.mark.slow @pytest.mark.integration @patch("maverick_mcp.agents.deep_research.get_cached_search_provider") @patch("exa_py.Exa") async def test_research_depth_performance( self, mock_exa_class, mock_provider, mock_llm, mock_exa_client ): """Benchmark performance across different research depths.""" mock_provider.return_value = ExaSearchProvider("test_key") mock_exa_class.return_value = mock_exa_client performance_results = {} for depth in ["basic", "standard", "comprehensive"]: agent = DeepResearchAgent( llm=mock_llm, exa_api_key="test_key", research_depth=depth, ) start_time = time.time() result = await agent.research_comprehensive( topic="AAPL financial analysis", session_id=f"perf_test_{depth}", depth=depth, timeout_budget=30.0, ) execution_time = time.time() - start_time performance_results[depth] = { "execution_time": execution_time, "success": result.get("status") == "success", "sources_analyzed": result.get("sources_analyzed", 0), } # Verify performance characteristics assert ( performance_results["basic"]["execution_time"] <= performance_results["comprehensive"]["execution_time"] ) # Basic should be fastest if performance_results["basic"]["success"]: assert ( performance_results["basic"]["execution_time"] < 15.0 ) # Should be fast @pytest.mark.slow async def test_parallel_vs_sequential_performance(self, sample_research_tasks): """Compare parallel vs sequential execution performance.""" config = ParallelResearchConfig(max_concurrent_agents=4, timeout_per_agent=10) orchestrator = ParallelResearchOrchestrator(config) async def mock_executor(task): await asyncio.sleep(1) # Simulate 1 second work per task return {"research_type": task.task_type, "insights": ["Mock insight"]} # Parallel execution start_time = time.time() parallel_result = await orchestrator.execute_parallel_research( tasks=sample_research_tasks, research_executor=mock_executor, ) parallel_time = time.time() - start_time # Sequential simulation start_time = time.time() for task in sample_research_tasks: await mock_executor(task) sequential_time = time.time() - start_time # Parallel should be significantly faster assert parallel_result.parallel_efficiency > 1.5 # At least 50% improvement assert parallel_time < sequential_time * 0.7 # Should be at least 30% faster @pytest.mark.unit async def test_memory_usage_monitoring(self, sample_research_tasks): """Test memory usage stays reasonable during parallel execution.""" import os import psutil process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB config = ParallelResearchConfig(max_concurrent_agents=4) orchestrator = ParallelResearchOrchestrator(config) async def mock_executor(task): # Create some data but not excessive data = {"results": ["data"] * 1000} # Small amount of data await asyncio.sleep(0.1) return data await orchestrator.execute_parallel_research( tasks=sample_research_tasks * 5, # More tasks to test scaling research_executor=mock_executor, ) final_memory = process.memory_info().rss / 1024 / 1024 # MB memory_growth = final_memory - initial_memory # Memory growth should be reasonable (less than 100MB for test) assert memory_growth < 100, f"Memory grew by {memory_growth:.1f}MB" # MCP Integration Tests class TestMCPIntegration: """Test MCP tool endpoints and research router integration.""" @pytest.mark.integration @patch("maverick_mcp.api.routers.research.get_settings") async def test_comprehensive_research_mcp_tool(self, mock_settings): """Test the comprehensive research MCP tool endpoint.""" mock_settings.return_value.research.exa_api_key = "test_key" result = await comprehensive_research( query="AAPL financial analysis", persona="moderate", research_scope="basic", max_sources=5, timeframe="1m", ) # Should return structured response assert isinstance(result, dict) assert "success" in result # If successful, should have proper structure if result.get("success"): assert "research_results" in result assert "research_metadata" in result assert "request_id" in result assert "timestamp" in result @pytest.mark.unit @patch("maverick_mcp.api.routers.research.get_settings") async def test_research_without_exa_key(self, mock_settings): """Test research behavior without ExaSearch API key.""" mock_settings.return_value.research.exa_api_key = None result = await comprehensive_research( query="test query", persona="moderate", research_scope="basic", ) assert result["success"] is False assert "Exa search provider not configured" in result["error"] assert "setup_instructions" in result["details"] @pytest.mark.unit def test_research_request_validation(self): """Test ResearchRequest model validation.""" # Valid request request = ResearchRequest( query="AAPL analysis", persona="moderate", research_scope="standard", max_sources=15, timeframe="1m", ) assert request.query == "AAPL analysis" assert request.persona == "moderate" assert request.research_scope == "standard" assert request.max_sources == 15 assert request.timeframe == "1m" # Test defaults minimal_request = ResearchRequest(query="test") assert minimal_request.persona == "moderate" assert minimal_request.research_scope == "standard" assert minimal_request.max_sources == 10 assert minimal_request.timeframe == "1m" @pytest.mark.unit def test_get_research_agent_optimization(self): """Test research agent creation with optimization parameters.""" # Test optimized agent creation agent = get_research_agent( query="complex financial analysis of multiple companies", research_scope="comprehensive", timeout_budget=25.0, max_sources=20, ) assert isinstance(agent, DeepResearchAgent) assert agent.max_sources <= 20 # Should respect or optimize max sources assert agent.default_depth in [ "basic", "standard", "comprehensive", "exhaustive", ] # Test standard agent creation standard_agent = get_research_agent() assert isinstance(standard_agent, DeepResearchAgent) # Content Analysis Tests class TestContentAnalyzer: """Test AI-powered content analysis functionality.""" @pytest.mark.unit async def test_content_analysis_success(self, mock_llm): """Test successful content analysis.""" analyzer = ContentAnalyzer(mock_llm) content = "Apple reported strong quarterly earnings with revenue growth of 12% and expanding market share in the services segment." result = await analyzer.analyze_content( content=content, persona="moderate", analysis_focus="financial" ) assert "insights" in result assert "sentiment" in result assert "risk_factors" in result assert "opportunities" in result assert "credibility_score" in result assert "relevance_score" in result assert "summary" in result assert "analysis_timestamp" in result @pytest.mark.unit async def test_content_analysis_fallback(self, mock_llm): """Test content analysis fallback when AI analysis fails.""" analyzer = ContentAnalyzer(mock_llm) # Make LLM fail mock_llm.ainvoke.side_effect = Exception("LLM error") result = await analyzer.analyze_content( content="Test content", persona="moderate" ) # Should fall back to keyword-based analysis assert result["fallback_used"] is True assert "sentiment" in result assert result["sentiment"]["direction"] in ["bullish", "bearish", "neutral"] @pytest.mark.unit async def test_batch_content_analysis(self, mock_llm): """Test batch content analysis functionality.""" analyzer = ContentAnalyzer(mock_llm) content_items = [ ("Apple shows strong growth", "source1"), ("Market conditions remain volatile", "source2"), ("Tech sector outlook positive", "source3"), ] results = await analyzer.analyze_content_batch( content_items=content_items, persona="moderate", analysis_focus="general" ) assert len(results) == len(content_items) for i, result in enumerate(results): assert result["source_identifier"] == f"source{i + 1}" assert result["batch_processed"] is True assert "sentiment" in result # Error Handling and Edge Cases class TestErrorHandlingAndEdgeCases: """Test comprehensive error handling and edge cases.""" @pytest.mark.unit async def test_empty_search_results(self, mock_llm): """Test behavior when search returns no results.""" provider = ExaSearchProvider("test_key") with patch("exa_py.Exa") as mock_exa: # Mock empty results mock_client = MagicMock() mock_response = MagicMock() mock_response.results = [] mock_client.search_and_contents.return_value = mock_response mock_exa.return_value = mock_client results = await provider.search("nonexistent topic", num_results=5) assert results == [] @pytest.mark.unit async def test_malformed_search_response(self, mock_llm): """Test handling of malformed search responses.""" provider = ExaSearchProvider("test_key") with patch("exa_py.Exa") as mock_exa: # Mock malformed response mock_client = MagicMock() mock_client.search_and_contents.side_effect = Exception( "Invalid response format" ) mock_exa.return_value = mock_client with pytest.raises(WebSearchError): await provider.search("test query") @pytest.mark.unit async def test_network_timeout_recovery(self): """Test network timeout recovery mechanisms.""" provider = ExaSearchProvider("test_key") # Simulate multiple timeouts followed by success with patch("exa_py.Exa") as mock_exa: call_count = 0 async def mock_search_with_recovery(*args, **kwargs): nonlocal call_count call_count += 1 if call_count <= 2: raise TimeoutError("Network timeout") else: # Success on third try mock_response = MagicMock() mock_result = MagicMock() mock_result.url = "https://example.com" mock_result.title = "Test Result" mock_result.text = "Test content" mock_result.published_date = "2024-01-15" mock_result.score = 0.8 mock_response.results = [mock_result] return mock_response mock_client = MagicMock() mock_client.search_and_contents.side_effect = mock_search_with_recovery mock_exa.return_value = mock_client # First two calls should fail and record failures for _ in range(2): with pytest.raises(WebSearchError): await provider.search("test query", timeout_budget=1.0) # Provider should still be healthy (failures recorded but not exceeded threshold) assert provider._failure_count == 2 # Third call should succeed and reset failure count results = await provider.search("test query") assert len(results) > 0 assert provider._failure_count == 0 # Reset on success @pytest.mark.unit async def test_concurrent_request_limits(self, sample_research_tasks): """Test that concurrent request limits are respected.""" config = ParallelResearchConfig(max_concurrent_agents=2) # Very low limit orchestrator = ParallelResearchOrchestrator(config) execution_times = [] async def tracking_executor(task): start = time.time() await asyncio.sleep(0.5) # Simulate work end = time.time() execution_times.append((start, end)) return {"result": "success"} await orchestrator.execute_parallel_research( tasks=sample_research_tasks, # 3 tasks research_executor=tracking_executor, ) # With max_concurrent_agents=2, the third task should start after one of the first two finishes # This means there should be overlap but not all three running simultaneously assert len(execution_times) == 3 # Sort by start time execution_times.sort() # The third task should start after the first task finishes # (allowing for some timing tolerance) third_start = execution_times[2][0] first_end = execution_times[0][1] # Third should start after first ends (with small tolerance for async timing) assert third_start >= (first_end - 0.1) # Integration Test Suite class TestFullIntegrationScenarios: """End-to-end integration tests for complete research workflows.""" @pytest.mark.integration @pytest.mark.slow @patch("maverick_mcp.agents.deep_research.get_cached_search_provider") @patch("exa_py.Exa") async def test_complete_research_workflow( self, mock_exa_class, mock_provider, mock_llm, mock_exa_client ): """Test complete research workflow from query to final report.""" # Setup comprehensive mocks mock_provider.return_value = ExaSearchProvider("test_key") mock_exa_class.return_value = mock_exa_client agent = DeepResearchAgent( llm=mock_llm, persona="moderate", exa_api_key="test_key", research_depth="standard", enable_parallel_execution=True, ) # Execute complete research workflow result = await agent.research_comprehensive( topic="Apple Inc (AAPL) investment analysis with market sentiment and competitive position", session_id="integration_test_session", depth="standard", focus_areas=["fundamentals", "market_sentiment", "competitive_analysis"], timeframe="1m", use_parallel_execution=True, ) # Verify comprehensive result structure if result.get("status") == "success": assert "findings" in result assert "confidence_score" in result assert isinstance(result["confidence_score"], int | float) assert 0.0 <= result["confidence_score"] <= 1.0 assert "citations" in result assert "execution_time_ms" in result # Check for parallel execution indicators if "parallel_execution_stats" in result: assert "successful_tasks" in result["parallel_execution_stats"] assert "parallel_efficiency" in result["parallel_execution_stats"] # Should handle both success and controlled failure scenarios assert "status" in result or "error" in result @pytest.mark.integration async def test_multi_persona_consistency(self, mock_llm, mock_exa_client): """Test research consistency across different investor personas.""" personas = ["conservative", "moderate", "aggressive", "day_trader"] results = {} for persona in personas: with ( patch( "maverick_mcp.agents.deep_research.get_cached_search_provider" ) as mock_provider, patch("exa_py.Exa") as mock_exa_class, ): mock_provider.return_value = ExaSearchProvider("test_key") mock_exa_class.return_value = mock_exa_client agent = DeepResearchAgent( llm=mock_llm, persona=persona, exa_api_key="test_key", research_depth="basic", ) result = await agent.research_comprehensive( topic="AAPL investment outlook", session_id=f"persona_test_{persona}", depth="basic", ) results[persona] = result # All personas should provide valid responses for persona, result in results.items(): assert isinstance(result, dict), f"Invalid result for {persona}" # Should have some form of result (success or controlled failure) assert "status" in result or "error" in result or "success" in result if __name__ == "__main__": # Run specific test categories based on markers pytest.main( [ __file__, "-v", "--tb=short", "-m", "unit", # Run unit tests by default ] )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server