Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_parallel_research_integration.py37.1 kB
""" Comprehensive integration tests for parallel research functionality. This test suite covers: - End-to-end parallel research workflows - Integration between all parallel research components - Performance characteristics under realistic conditions - Error scenarios and recovery mechanisms - Logging integration across all components - Resource usage and scalability testing """ import asyncio import time from datetime import datetime from typing import Any from unittest.mock import Mock, patch import pytest from langchain_core.language_models import BaseChatModel from langchain_core.messages import AIMessage from langgraph.checkpoint.memory import MemorySaver from maverick_mcp.agents.deep_research import DeepResearchAgent from maverick_mcp.utils.parallel_research import ( ParallelResearchConfig, ) class MockSearchProvider: """Mock search provider for integration testing.""" def __init__(self, provider_name: str, fail_rate: float = 0.0): self.provider_name = provider_name self.fail_rate = fail_rate self.call_count = 0 async def search(self, query: str, num_results: int = 10) -> list[dict[str, Any]]: """Mock search with configurable failure rate.""" self.call_count += 1 # Simulate failures based on fail_rate import random if random.random() < self.fail_rate: raise RuntimeError(f"{self.provider_name} search failed") await asyncio.sleep(0.02) # Simulate network latency # Generate mock search results results = [] for i in range(min(num_results, 3)): # Return up to 3 results results.append( { "url": f"https://{self.provider_name.lower()}.example.com/article_{i}_{self.call_count}", "title": f"{query} - Article {i + 1} from {self.provider_name}", "content": f"This is detailed content about {query} from {self.provider_name}. " f"It contains valuable insights and analysis relevant to the research topic. " f"Provider: {self.provider_name}, Call: {self.call_count}", "published_date": datetime.now().isoformat(), "author": f"Expert Analyst {i + 1}", "score": 0.8 - (i * 0.1), "provider": self.provider_name.lower(), } ) return results class MockContentAnalyzer: """Mock content analyzer for integration testing.""" def __init__(self, analysis_delay: float = 0.01): self.analysis_delay = analysis_delay self.analysis_count = 0 async def analyze_content( self, content: str, persona: str, analysis_focus: str = "general" ) -> dict[str, Any]: """Mock content analysis.""" self.analysis_count += 1 await asyncio.sleep(self.analysis_delay) # Generate realistic analysis based on content keywords insights = [] risk_factors = [] opportunities = [] content_lower = content.lower() if "earnings" in content_lower or "revenue" in content_lower: insights.append("Strong earnings performance indicated") opportunities.append("Potential for continued revenue growth") if "technical" in content_lower or "chart" in content_lower: insights.append("Technical indicators suggest trend continuation") risk_factors.append("Support level break could trigger selling") if "sentiment" in content_lower or "analyst" in content_lower: insights.append("Market sentiment appears positive") opportunities.append("Analyst upgrades possible") if "competitive" in content_lower or "market share" in content_lower: insights.append("Competitive position remains strong") risk_factors.append("Increased competitive pressure in market") # Default insights if no specific keywords found if not insights: insights = [ f"General analysis insight {self.analysis_count} for {persona} investor" ] sentiment_mapping = { "conservative": {"direction": "neutral", "confidence": 0.6}, "moderate": {"direction": "bullish", "confidence": 0.7}, "aggressive": {"direction": "bullish", "confidence": 0.8}, } return { "insights": insights, "sentiment": sentiment_mapping.get( persona, {"direction": "neutral", "confidence": 0.5} ), "risk_factors": risk_factors or ["Standard market risks apply"], "opportunities": opportunities or ["Monitor for opportunities"], "credibility_score": 0.8, "relevance_score": 0.75, "summary": f"Analysis for {persona} investor from {analysis_focus} perspective", "analysis_timestamp": datetime.now(), } class MockLLM(BaseChatModel): """Mock LLM for integration testing.""" def __init__(self, response_delay: float = 0.05, fail_rate: float = 0.0): super().__init__() self.response_delay = response_delay self.fail_rate = fail_rate self.invocation_count = 0 async def ainvoke(self, messages, config=None, **kwargs): """Mock async LLM invocation.""" self.invocation_count += 1 # Simulate failures import random if random.random() < self.fail_rate: raise RuntimeError("LLM service unavailable") await asyncio.sleep(self.response_delay) # Generate contextual response based on message content message_content = str(messages[-1].content).lower() if "synthesis" in message_content: response = """ Based on the comprehensive research from multiple specialized agents, this analysis provides a well-rounded view of the investment opportunity. The fundamental analysis shows strong financial metrics, while sentiment analysis indicates positive market reception. Technical analysis suggests favorable entry points, and competitive analysis reveals sustainable advantages. Overall, this presents a compelling investment case for the specified investor persona. """ else: response = '{"KEY_INSIGHTS": ["AI-generated insight"], "SENTIMENT": {"direction": "bullish", "confidence": 0.75}, "CREDIBILITY": 0.8}' return AIMessage(content=response) def _generate(self, messages, stop=None, **kwargs): raise NotImplementedError("Use ainvoke for async tests") @property def _llm_type(self) -> str: return "mock_llm" @pytest.mark.integration class TestParallelResearchEndToEnd: """Test complete end-to-end parallel research workflows.""" @pytest.fixture def integration_config(self): """Configuration for integration testing.""" return ParallelResearchConfig( max_concurrent_agents=3, timeout_per_agent=10, enable_fallbacks=True, rate_limit_delay=0.1, ) @pytest.fixture def mock_search_providers(self): """Create mock search providers.""" return [ MockSearchProvider("Exa", fail_rate=0.1), MockSearchProvider("Tavily", fail_rate=0.1), ] @pytest.fixture def integration_agent(self, integration_config, mock_search_providers): """Create DeepResearchAgent for integration testing.""" llm = MockLLM(response_delay=0.05, fail_rate=0.05) agent = DeepResearchAgent( llm=llm, persona="moderate", checkpointer=MemorySaver(), enable_parallel_execution=True, parallel_config=integration_config, ) # Replace search providers with mocks agent.search_providers = mock_search_providers # Replace content analyzer with mock agent.content_analyzer = MockContentAnalyzer(analysis_delay=0.02) return agent @pytest.mark.asyncio async def test_complete_parallel_research_workflow(self, integration_agent): """Test complete parallel research workflow from start to finish.""" start_time = time.time() result = await integration_agent.research_comprehensive( topic="Apple Inc comprehensive investment analysis for Q4 2024", session_id="integration_test_001", depth="comprehensive", focus_areas=["fundamentals", "technical_analysis", "market_sentiment"], timeframe="30d", ) execution_time = time.time() - start_time # Verify successful execution assert result["status"] == "success" assert result["agent_type"] == "deep_research" assert result["execution_mode"] == "parallel" assert ( result["research_topic"] == "Apple Inc comprehensive investment analysis for Q4 2024" ) # Verify research quality assert result["confidence_score"] > 0.5 assert result["sources_analyzed"] > 0 assert len(result["citations"]) > 0 # Verify parallel execution stats assert "parallel_execution_stats" in result stats = result["parallel_execution_stats"] assert stats["total_tasks"] > 0 assert stats["successful_tasks"] >= 0 assert stats["parallel_efficiency"] > 0 # Verify findings structure assert "findings" in result findings = result["findings"] assert "synthesis" in findings assert "confidence_score" in findings # Verify performance characteristics assert execution_time < 15 # Should complete within reasonable time assert result["execution_time_ms"] > 0 @pytest.mark.asyncio async def test_parallel_vs_sequential_performance_comparison( self, integration_agent ): """Compare parallel vs sequential execution performance.""" topic = "Tesla Inc strategic analysis and market position" # Test parallel execution start_parallel = time.time() parallel_result = await integration_agent.research_comprehensive( topic=topic, session_id="perf_test_parallel", use_parallel_execution=True, depth="standard", ) parallel_time = time.time() - start_parallel # Test sequential execution start_sequential = time.time() sequential_result = await integration_agent.research_comprehensive( topic=topic, session_id="perf_test_sequential", use_parallel_execution=False, depth="standard", ) sequential_time = time.time() - start_sequential # Both should succeed assert parallel_result["status"] == "success" assert sequential_result["status"] == "success" # Verify execution modes assert parallel_result["execution_mode"] == "parallel" # Sequential won't have execution_mode in result # Parallel should show efficiency metrics if "parallel_execution_stats" in parallel_result: stats = parallel_result["parallel_execution_stats"] assert stats["parallel_efficiency"] > 0 # If multiple tasks were executed in parallel, should show some efficiency gain if stats["total_tasks"] > 1: print( f"Parallel time: {parallel_time:.3f}s, Sequential time: {sequential_time:.3f}s" ) print(f"Parallel efficiency: {stats['parallel_efficiency']:.2f}x") @pytest.mark.asyncio async def test_error_resilience_and_fallback(self, integration_config): """Test error resilience and fallback mechanisms.""" # Create agent with higher failure rates to test resilience failing_llm = MockLLM(fail_rate=0.3) # 30% failure rate failing_providers = [ MockSearchProvider("FailingProvider", fail_rate=0.5) # 50% failure rate ] agent = DeepResearchAgent( llm=failing_llm, persona="moderate", enable_parallel_execution=True, parallel_config=integration_config, ) agent.search_providers = failing_providers agent.content_analyzer = MockContentAnalyzer() # Should handle failures gracefully result = await agent.research_comprehensive( topic="Resilience test analysis", session_id="resilience_test_001", depth="basic", ) # Should complete successfully despite some failures assert result["status"] == "success" # May have lower confidence due to failures assert result["confidence_score"] >= 0.0 # Should have some parallel execution stats even if tasks failed if result.get("execution_mode") == "parallel": stats = result["parallel_execution_stats"] # Total tasks should be >= failed tasks (some tasks were attempted) assert stats["total_tasks"] >= stats.get("failed_tasks", 0) @pytest.mark.asyncio async def test_different_research_depths(self, integration_agent): """Test parallel research with different depth configurations.""" depths_to_test = ["basic", "standard", "comprehensive"] results = {} for depth in depths_to_test: result = await integration_agent.research_comprehensive( topic=f"Microsoft Corp analysis - {depth} depth", session_id=f"depth_test_{depth}", depth=depth, use_parallel_execution=True, ) results[depth] = result # All should succeed assert result["status"] == "success" assert result["research_depth"] == depth # Comprehensive should generally have more sources and higher confidence if all(r["status"] == "success" for r in results.values()): basic_sources = results["basic"]["sources_analyzed"] comprehensive_sources = results["comprehensive"]["sources_analyzed"] # More comprehensive research should analyze more sources (when successful) if basic_sources > 0 and comprehensive_sources > 0: assert comprehensive_sources >= basic_sources @pytest.mark.asyncio async def test_persona_specific_research(self, integration_config): """Test parallel research with different investor personas.""" personas_to_test = ["conservative", "moderate", "aggressive"] topic = "Amazon Inc investment opportunity analysis" for persona in personas_to_test: llm = MockLLM(response_delay=0.03) agent = DeepResearchAgent( llm=llm, persona=persona, enable_parallel_execution=True, parallel_config=integration_config, ) # Mock components agent.search_providers = [MockSearchProvider("TestProvider")] agent.content_analyzer = MockContentAnalyzer() result = await agent.research_comprehensive( topic=topic, session_id=f"persona_test_{persona}", use_parallel_execution=True, ) assert result["status"] == "success" assert result["persona"] == persona # Should have findings tailored to persona assert "findings" in result @pytest.mark.asyncio async def test_concurrent_research_sessions(self, integration_agent): """Test multiple concurrent research sessions.""" topics = [ "Google Alphabet strategic analysis", "Meta Platforms competitive position", "Netflix content strategy evaluation", ] # Run multiple research sessions concurrently tasks = [ integration_agent.research_comprehensive( topic=topic, session_id=f"concurrent_test_{i}", use_parallel_execution=True, depth="standard", ) for i, topic in enumerate(topics) ] start_time = time.time() results = await asyncio.gather(*tasks, return_exceptions=True) execution_time = time.time() - start_time # All should succeed (or be exceptions we can handle) successful_results = [ r for r in results if isinstance(r, dict) and r.get("status") == "success" ] assert ( len(successful_results) >= len(topics) // 2 ) # At least half should succeed # Should complete in reasonable time despite concurrency assert execution_time < 30 # Verify each result has proper session isolation for _i, result in enumerate(successful_results): if "findings" in result: # Each should have distinct research content assert result["research_topic"] in topics @pytest.mark.integration class TestParallelResearchScalability: """Test scalability characteristics of parallel research.""" @pytest.fixture def scalability_config(self): """Configuration for scalability testing.""" return ParallelResearchConfig( max_concurrent_agents=4, timeout_per_agent=8, enable_fallbacks=True, rate_limit_delay=0.05, ) @pytest.mark.asyncio async def test_agent_limit_enforcement(self, scalability_config): """Test that concurrent agent limits are properly enforced.""" llm = MockLLM(response_delay=0.1) # Slower to see concurrency effects agent = DeepResearchAgent( llm=llm, persona="moderate", enable_parallel_execution=True, parallel_config=scalability_config, ) # Mock components with tracking call_tracker = {"max_concurrent": 0, "current_concurrent": 0} class TrackingProvider(MockSearchProvider): async def search(self, query: str, num_results: int = 10): call_tracker["current_concurrent"] += 1 call_tracker["max_concurrent"] = max( call_tracker["max_concurrent"], call_tracker["current_concurrent"] ) try: return await super().search(query, num_results) finally: call_tracker["current_concurrent"] -= 1 agent.search_providers = [TrackingProvider("Tracker")] agent.content_analyzer = MockContentAnalyzer() result = await agent.research_comprehensive( topic="Scalability test with many potential subtasks", session_id="scalability_test_001", focus_areas=[ "fundamentals", "technical", "sentiment", "competitive", "extra1", "extra2", ], use_parallel_execution=True, ) assert result["status"] == "success" # Should not exceed configured max concurrent agents assert ( call_tracker["max_concurrent"] <= scalability_config.max_concurrent_agents ) @pytest.mark.asyncio async def test_memory_usage_under_load(self, scalability_config): """Test memory usage characteristics under load.""" import gc import os import psutil # Get initial memory usage process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB llm = MockLLM(response_delay=0.02) agent = DeepResearchAgent( llm=llm, persona="moderate", enable_parallel_execution=True, parallel_config=scalability_config, ) agent.search_providers = [MockSearchProvider("MemoryTest")] agent.content_analyzer = MockContentAnalyzer(analysis_delay=0.01) # Perform multiple research operations for i in range(10): # 10 operations to test memory accumulation result = await agent.research_comprehensive( topic=f"Memory test analysis {i}", session_id=f"memory_test_{i}", use_parallel_execution=True, depth="basic", ) assert result["status"] == "success" # Force garbage collection gc.collect() # Check final memory usage final_memory = process.memory_info().rss / 1024 / 1024 # MB memory_growth = final_memory - initial_memory # Memory growth should be reasonable (not indicative of leaks) assert memory_growth < 100 # Less than 100MB growth for 10 operations @pytest.mark.asyncio async def test_large_scale_task_distribution(self, scalability_config): """Test task distribution with many potential research areas.""" llm = MockLLM() agent = DeepResearchAgent( llm=llm, persona="moderate", enable_parallel_execution=True, parallel_config=scalability_config, ) agent.search_providers = [MockSearchProvider("LargeScale")] agent.content_analyzer = MockContentAnalyzer() # Test with many focus areas (more than max concurrent agents) many_focus_areas = [ "earnings", "revenue", "profit_margins", "debt_analysis", "cash_flow", "technical_indicators", "chart_patterns", "support_levels", "momentum", "analyst_ratings", "news_sentiment", "social_sentiment", "institutional_sentiment", "market_share", "competitive_position", "industry_trends", "regulatory_environment", ] result = await agent.research_comprehensive( topic="Large scale comprehensive analysis with many research dimensions", session_id="large_scale_test_001", focus_areas=many_focus_areas, use_parallel_execution=True, depth="comprehensive", ) assert result["status"] == "success" # Should handle large number of focus areas efficiently if "parallel_execution_stats" in result: stats = result["parallel_execution_stats"] # Should not create more tasks than max concurrent agents allows assert stats["total_tasks"] <= scalability_config.max_concurrent_agents # Should achieve some parallel efficiency if stats["successful_tasks"] > 1: assert stats["parallel_efficiency"] > 1.0 @pytest.mark.integration class TestParallelResearchLoggingIntegration: """Test integration of logging throughout parallel research workflow.""" @pytest.fixture def logged_agent(self): """Create agent with comprehensive logging.""" llm = MockLLM(response_delay=0.02) config = ParallelResearchConfig( max_concurrent_agents=2, timeout_per_agent=5, enable_fallbacks=True, rate_limit_delay=0.05, ) agent = DeepResearchAgent( llm=llm, persona="moderate", enable_parallel_execution=True, parallel_config=config, ) agent.search_providers = [MockSearchProvider("LoggedProvider")] agent.content_analyzer = MockContentAnalyzer() return agent @pytest.mark.asyncio async def test_comprehensive_logging_workflow(self, logged_agent): """Test that comprehensive logging occurs throughout workflow.""" with patch( "maverick_mcp.utils.orchestration_logging.get_orchestration_logger" ) as mock_get_logger: mock_logger = Mock() mock_get_logger.return_value = mock_logger result = await logged_agent.research_comprehensive( topic="Comprehensive logging test analysis", session_id="logging_test_001", use_parallel_execution=True, ) assert result["status"] == "success" # Should have multiple logging calls assert mock_logger.info.call_count >= 10 # Multiple stages should log # Verify different types of log messages occurred all_log_calls = [call[0][0] for call in mock_logger.info.call_args_list] " ".join(all_log_calls) # Should contain various logging elements assert any("RESEARCH_START" in call for call in all_log_calls) assert any("PARALLEL" in call for call in all_log_calls) @pytest.mark.asyncio async def test_error_logging_integration(self, logged_agent): """Test error logging integration in parallel workflow.""" # Create a scenario that will cause some errors failing_llm = MockLLM(fail_rate=0.5) # High failure rate logged_agent.llm = failing_llm with patch( "maverick_mcp.utils.orchestration_logging.get_orchestration_logger" ) as mock_get_logger: mock_logger = Mock() mock_get_logger.return_value = mock_logger # This may succeed or fail, but should log appropriately try: result = await logged_agent.research_comprehensive( topic="Error logging test", session_id="error_logging_test_001", use_parallel_execution=True, ) # If it succeeds, should still have logged errors from failed components assert result["status"] == "success" or result["status"] == "error" except Exception: # If it fails completely, that's also acceptable for this test pass # Should have some error or warning logs due to high failure rate has_error_logs = ( mock_logger.error.call_count > 0 or mock_logger.warning.call_count > 0 ) assert has_error_logs @pytest.mark.asyncio async def test_performance_metrics_logging(self, logged_agent): """Test that performance metrics are properly logged.""" with patch( "maverick_mcp.utils.orchestration_logging.log_performance_metrics" ) as mock_perf_log: result = await logged_agent.research_comprehensive( topic="Performance metrics test", session_id="perf_metrics_test_001", use_parallel_execution=True, ) assert result["status"] == "success" # Should have logged performance metrics assert mock_perf_log.call_count >= 1 # Verify metrics content perf_call = mock_perf_log.call_args_list[0] perf_call[0][0] metrics = perf_call[0][1] assert isinstance(metrics, dict) # Should contain relevant performance metrics expected_metrics = [ "total_tasks", "successful_tasks", "failed_tasks", "parallel_efficiency", ] assert any(metric in metrics for metric in expected_metrics) @pytest.mark.integration class TestParallelResearchErrorRecovery: """Test error recovery and resilience in parallel research.""" @pytest.mark.asyncio async def test_partial_failure_recovery(self): """Test recovery when some parallel tasks fail.""" config = ParallelResearchConfig( max_concurrent_agents=3, timeout_per_agent=5, enable_fallbacks=True, rate_limit_delay=0.05, ) # Create agent with mixed success/failure providers llm = MockLLM(response_delay=0.03) agent = DeepResearchAgent( llm=llm, persona="moderate", enable_parallel_execution=True, parallel_config=config, ) # Mix of failing and working providers agent.search_providers = [ MockSearchProvider("WorkingProvider", fail_rate=0.0), MockSearchProvider("FailingProvider", fail_rate=0.8), # 80% failure rate ] agent.content_analyzer = MockContentAnalyzer() result = await agent.research_comprehensive( topic="Partial failure recovery test", session_id="partial_failure_test_001", use_parallel_execution=True, ) # Should complete successfully despite some failures assert result["status"] == "success" # Should have parallel execution stats showing mixed results if "parallel_execution_stats" in result: stats = result["parallel_execution_stats"] # Should have attempted multiple tasks assert stats["total_tasks"] >= 1 # May have some failures but should have some successes if stats["total_tasks"] > 1: assert ( stats["successful_tasks"] + stats["failed_tasks"] == stats["total_tasks"] ) @pytest.mark.asyncio async def test_complete_failure_fallback(self): """Test fallback to sequential when parallel execution completely fails.""" config = ParallelResearchConfig( max_concurrent_agents=2, timeout_per_agent=3, enable_fallbacks=True, ) # Create agent that will fail in parallel mode failing_llm = MockLLM(fail_rate=0.9) # Very high failure rate agent = DeepResearchAgent( llm=failing_llm, persona="moderate", enable_parallel_execution=True, parallel_config=config, ) agent.search_providers = [MockSearchProvider("FailingProvider", fail_rate=0.9)] agent.content_analyzer = MockContentAnalyzer() # Mock the sequential execution to succeed with patch.object(agent.graph, "ainvoke") as mock_sequential: mock_sequential.return_value = { "status": "success", "persona": "moderate", "research_confidence": 0.6, "research_findings": {"synthesis": "Fallback analysis"}, } result = await agent.research_comprehensive( topic="Complete failure fallback test", session_id="complete_failure_test_001", use_parallel_execution=True, ) # Should fall back to sequential and succeed assert result["status"] == "success" # Sequential execution should have been called due to parallel failure mock_sequential.assert_called_once() @pytest.mark.asyncio async def test_timeout_handling_in_parallel_execution(self): """Test handling of timeouts in parallel execution.""" config = ParallelResearchConfig( max_concurrent_agents=2, timeout_per_agent=1, # Very short timeout enable_fallbacks=True, ) # Create components with delays longer than timeout slow_llm = MockLLM(response_delay=2.0) # Slower than timeout agent = DeepResearchAgent( llm=slow_llm, persona="moderate", enable_parallel_execution=True, parallel_config=config, ) agent.search_providers = [MockSearchProvider("SlowProvider")] agent.content_analyzer = MockContentAnalyzer(analysis_delay=0.5) # Should handle timeouts gracefully result = await agent.research_comprehensive( topic="Timeout handling test", session_id="timeout_test_001", use_parallel_execution=True, ) # Should complete with some status (success or error) assert result["status"] in ["success", "error"] # If parallel execution stats are available, should show timeout effects if ( "parallel_execution_stats" in result and result["parallel_execution_stats"]["total_tasks"] > 0 ): stats = result["parallel_execution_stats"] # Timeouts should result in failed tasks assert stats["failed_tasks"] >= 0 @pytest.mark.integration class TestParallelResearchDataFlow: """Test data flow and consistency in parallel research.""" @pytest.mark.asyncio async def test_data_consistency_across_parallel_tasks(self): """Test that data remains consistent across parallel task execution.""" config = ParallelResearchConfig( max_concurrent_agents=3, timeout_per_agent=5, ) llm = MockLLM() agent = DeepResearchAgent( llm=llm, persona="moderate", enable_parallel_execution=True, parallel_config=config, ) # Create providers that return consistent data consistent_provider = MockSearchProvider("ConsistentProvider") agent.search_providers = [consistent_provider] agent.content_analyzer = MockContentAnalyzer() result = await agent.research_comprehensive( topic="Data consistency test for Apple Inc", session_id="consistency_test_001", use_parallel_execution=True, ) assert result["status"] == "success" # Verify data structure consistency assert "research_topic" in result assert "confidence_score" in result assert "citations" in result assert isinstance(result["citations"], list) # If parallel execution occurred, verify stats structure if "parallel_execution_stats" in result: stats = result["parallel_execution_stats"] required_stats = [ "total_tasks", "successful_tasks", "failed_tasks", "parallel_efficiency", ] for stat in required_stats: assert stat in stats assert isinstance(stats[stat], int | float) @pytest.mark.asyncio async def test_citation_aggregation_across_tasks(self): """Test that citations are properly aggregated from parallel tasks.""" config = ParallelResearchConfig(max_concurrent_agents=2) llm = MockLLM() agent = DeepResearchAgent( llm=llm, persona="moderate", enable_parallel_execution=True, parallel_config=config, ) # Multiple providers to generate multiple sources agent.search_providers = [ MockSearchProvider("Provider1"), MockSearchProvider("Provider2"), ] agent.content_analyzer = MockContentAnalyzer() result = await agent.research_comprehensive( topic="Citation aggregation test", session_id="citation_test_001", use_parallel_execution=True, ) assert result["status"] == "success" # Should have citations from multiple sources citations = result.get("citations", []) if len(citations) > 0: # Citations should have required fields for citation in citations: assert "id" in citation assert "title" in citation assert "url" in citation assert "credibility_score" in citation # Should have unique citation IDs citation_ids = [c["id"] for c in citations] assert len(citation_ids) == len(set(citation_ids)) @pytest.mark.asyncio async def test_research_quality_metrics(self): """Test research quality metrics in parallel execution.""" config = ParallelResearchConfig(max_concurrent_agents=2) llm = MockLLM() agent = DeepResearchAgent( llm=llm, persona="moderate", enable_parallel_execution=True, parallel_config=config, ) agent.search_providers = [MockSearchProvider("QualityProvider")] agent.content_analyzer = MockContentAnalyzer() result = await agent.research_comprehensive( topic="Research quality metrics test", session_id="quality_test_001", use_parallel_execution=True, ) assert result["status"] == "success" # Verify quality metrics assert "confidence_score" in result assert 0.0 <= result["confidence_score"] <= 1.0 assert "sources_analyzed" in result assert isinstance(result["sources_analyzed"], int) assert result["sources_analyzed"] >= 0 if "source_diversity" in result: assert 0.0 <= result["source_diversity"] <= 1.0

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server