Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
deep_research.py134 kB
""" DeepResearchAgent implementation using 2025 LangGraph patterns. Provides comprehensive financial research capabilities with web search, content analysis, sentiment detection, and source validation. """ from __future__ import annotations import asyncio import json import logging from collections.abc import Iterable from datetime import UTC, datetime from typing import Any from uuid import uuid4 from langchain_core.language_models import BaseChatModel from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.tools import BaseTool, tool from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, START, StateGraph # type: ignore[import-untyped] from langgraph.types import Command # type: ignore[import-untyped] from maverick_mcp.agents.base import PersonaAwareAgent from maverick_mcp.agents.circuit_breaker import circuit_manager from maverick_mcp.config.settings import get_settings from maverick_mcp.exceptions import ( WebSearchError, ) from maverick_mcp.memory.stores import ConversationStore from maverick_mcp.utils.orchestration_logging import ( get_orchestration_logger, log_agent_execution, log_method_call, log_performance_metrics, log_synthesis_operation, ) try: # pragma: no cover - optional dependency from tavily import TavilyClient # type: ignore[import-not-found] except ImportError: # pragma: no cover TavilyClient = None # type: ignore[assignment] # Import moved to avoid circular dependency - will import where needed from maverick_mcp.workflows.state import DeepResearchState logger = logging.getLogger(__name__) settings = get_settings() # Global search provider cache and connection manager _search_provider_cache: dict[str, Any] = {} async def get_cached_search_provider(exa_api_key: str | None = None) -> Any | None: """Get cached Exa search provider to avoid repeated initialization delays.""" cache_key = f"exa:{exa_api_key is not None}" if cache_key in _search_provider_cache: return _search_provider_cache[cache_key] logger.info("Initializing Exa search provider") provider = None # Initialize Exa provider with caching if exa_api_key: try: provider = ExaSearchProvider(exa_api_key) logger.info("Initialized Exa search provider") # Cache the provider _search_provider_cache[cache_key] = provider except ImportError as e: logger.warning(f"Failed to initialize Exa provider: {e}") return provider # Research depth levels optimized for quick searches RESEARCH_DEPTH_LEVELS = { "basic": { "max_sources": 3, "max_searches": 1, # Reduced for speed "analysis_depth": "summary", "validation_required": False, }, "standard": { "max_sources": 5, # Reduced from 8 "max_searches": 2, # Reduced from 4 "analysis_depth": "detailed", "validation_required": False, # Disabled for speed }, "comprehensive": { "max_sources": 10, # Reduced from 15 "max_searches": 3, # Reduced from 6 "analysis_depth": "comprehensive", "validation_required": False, # Disabled for speed }, "exhaustive": { "max_sources": 15, # Reduced from 25 "max_searches": 5, # Reduced from 10 "analysis_depth": "exhaustive", "validation_required": True, }, } # Persona-specific research focus areas PERSONA_RESEARCH_FOCUS = { "conservative": { "keywords": [ "dividend", "stability", "risk", "debt", "cash flow", "established", ], "sources": [ "sec filings", "annual reports", "rating agencies", "dividend history", ], "risk_focus": "downside protection", "time_horizon": "long-term", }, "moderate": { "keywords": ["growth", "value", "balance", "diversification", "fundamentals"], "sources": ["financial statements", "analyst reports", "industry analysis"], "risk_focus": "risk-adjusted returns", "time_horizon": "medium-term", }, "aggressive": { "keywords": ["growth", "momentum", "opportunity", "innovation", "expansion"], "sources": [ "news", "earnings calls", "industry trends", "competitive analysis", ], "risk_focus": "upside potential", "time_horizon": "short to medium-term", }, "day_trader": { "keywords": [ "catalysts", "earnings", "news", "volume", "volatility", "momentum", ], "sources": ["breaking news", "social sentiment", "earnings announcements"], "risk_focus": "short-term risks", "time_horizon": "intraday to weekly", }, } class WebSearchProvider: """Base class for web search providers with early abort mechanism.""" def __init__(self, api_key: str): self.api_key = api_key self.rate_limiter = None # Implement rate limiting self._failure_count = 0 self._max_failures = 3 # Abort after 3 consecutive failures self._is_healthy = True self.settings = get_settings() self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") def _calculate_timeout( self, query: str, timeout_budget: float | None = None ) -> float: """Calculate generous timeout for thorough research operations.""" query_words = len(query.split()) # Generous timeout calculation for thorough search operations if query_words <= 3: base_timeout = 30.0 # Simple queries - 30s for thorough results elif query_words <= 8: base_timeout = 45.0 # Standard queries - 45s for comprehensive search else: base_timeout = 60.0 # Complex queries - 60s for exhaustive search # Apply budget constraints if available if timeout_budget and timeout_budget > 0: # Use generous portion of available budget per search operation budget_timeout = max( timeout_budget * 0.6, 30.0 ) # At least 30s, use 60% of budget calculated_timeout = min(base_timeout, budget_timeout) # Ensure minimum timeout (at least 30s for thorough search) calculated_timeout = max(calculated_timeout, 30.0) else: calculated_timeout = base_timeout # Final timeout with generous minimum for thorough search final_timeout = max(calculated_timeout, 30.0) return final_timeout def _record_failure(self, error_type: str = "unknown") -> None: """Record a search failure and check if provider should be disabled.""" self._failure_count += 1 # Use separate thresholds for timeout vs other failures timeout_threshold = getattr( self.settings.performance, "search_timeout_failure_threshold", 12 ) # Much more tolerant of timeout failures - they may be due to network/complexity if error_type == "timeout" and self._failure_count >= timeout_threshold: self._is_healthy = False logger.warning( f"Search provider {self.__class__.__name__} disabled after " f"{self._failure_count} consecutive timeout failures (threshold: {timeout_threshold})" ) elif error_type != "timeout" and self._failure_count >= self._max_failures * 2: # Be more lenient for non-timeout failures (2x threshold) self._is_healthy = False logger.warning( f"Search provider {self.__class__.__name__} disabled after " f"{self._failure_count} total non-timeout failures" ) logger.debug( f"Provider {self.__class__.__name__} failure recorded: " f"type={error_type}, count={self._failure_count}, healthy={self._is_healthy}" ) def _record_success(self) -> None: """Record a successful search and reset failure count.""" if self._failure_count > 0: logger.info( f"Search provider {self.__class__.__name__} recovered after " f"{self._failure_count} failures" ) self._failure_count = 0 self._is_healthy = True def is_healthy(self) -> bool: """Check if provider is healthy and should be used.""" return self._is_healthy async def search( self, query: str, num_results: int = 10, timeout_budget: float | None = None ) -> list[dict[str, Any]]: """Perform web search and return results.""" raise NotImplementedError async def get_content(self, url: str) -> dict[str, Any]: """Extract content from URL.""" raise NotImplementedError async def search_multiple_providers( self, queries: list[str], providers: list[str] | None = None, max_results_per_query: int = 5, ) -> dict[str, list[dict[str, Any]]]: """Search using multiple providers and return aggregated results.""" providers = providers or ["exa"] # Default to available providers results = {} for provider_name in providers: provider_results = [] for query in queries: try: query_results = await self.search(query, max_results_per_query) provider_results.extend(query_results or []) except Exception as e: self.logger.warning( f"Search failed for provider {provider_name}, query '{query}': {e}" ) continue results[provider_name] = provider_results return results def _timeframe_to_date(self, timeframe: str) -> str | None: """Convert timeframe string to date string.""" from datetime import datetime, timedelta now = datetime.now() if timeframe == "1d": date = now - timedelta(days=1) elif timeframe == "1w": date = now - timedelta(weeks=1) elif timeframe == "1m": date = now - timedelta(days=30) else: # Invalid or unsupported timeframe, return None return None return date.strftime("%Y-%m-%d") class ExaSearchProvider(WebSearchProvider): """Exa search provider for comprehensive web search using MCP tools with financial optimization.""" def __init__(self, api_key: str): super().__init__(api_key) # Store the API key for verification self._api_key_verified = bool(api_key) # Financial-specific domain preferences for better results self.financial_domains = [ "sec.gov", "edgar.sec.gov", "investor.gov", "bloomberg.com", "reuters.com", "wsj.com", "ft.com", "marketwatch.com", "yahoo.com/finance", "finance.yahoo.com", "morningstar.com", "fool.com", "seekingalpha.com", "investopedia.com", "barrons.com", "cnbc.com", "nasdaq.com", "nyse.com", "finra.org", "federalreserve.gov", "treasury.gov", "bls.gov", ] # Domains to exclude for financial searches self.excluded_domains = [ "facebook.com", "twitter.com", "x.com", "instagram.com", "tiktok.com", "reddit.com", "pinterest.com", "linkedin.com", "youtube.com", "wikipedia.org", ] logger.info("Initialized ExaSearchProvider with financial optimization") async def search( self, query: str, num_results: int = 10, timeout_budget: float | None = None ) -> list[dict[str, Any]]: """Search using Exa via async client for comprehensive web results with adaptive timeout.""" return await self._search_with_strategy( query, num_results, timeout_budget, "auto" ) async def search_financial( self, query: str, num_results: int = 10, timeout_budget: float | None = None, strategy: str = "hybrid", ) -> list[dict[str, Any]]: """ Enhanced financial search with optimized queries and domain targeting. Args: query: Search query num_results: Number of results to return timeout_budget: Timeout budget in seconds strategy: Search strategy - 'hybrid', 'authoritative', 'comprehensive', or 'auto' """ return await self._search_with_strategy( query, num_results, timeout_budget, strategy ) async def _search_with_strategy( self, query: str, num_results: int, timeout_budget: float | None, strategy: str ) -> list[dict[str, Any]]: """Internal method to handle different search strategies.""" # Check provider health before attempting search if not self.is_healthy(): logger.warning("Exa provider is unhealthy - skipping search") raise WebSearchError("Exa provider disabled due to repeated failures") # Calculate adaptive timeout search_timeout = self._calculate_timeout(query, timeout_budget) try: # Use search-specific circuit breaker settings (more tolerant) circuit_breaker = await circuit_manager.get_or_create( "exa_search", failure_threshold=getattr( self.settings.performance, "search_circuit_breaker_failure_threshold", 8, ), recovery_timeout=getattr( self.settings.performance, "search_circuit_breaker_recovery_timeout", 30, ), ) async def _search(): # Use the async exa-py library for web search try: from exa_py import AsyncExa # Initialize AsyncExa client with API key async_exa_client = AsyncExa(api_key=self.api_key) # Configure search parameters based on strategy search_params = self._get_search_params( query, num_results, strategy ) # Call Exa search with optimized parameters exa_response = await async_exa_client.search_and_contents( **search_params ) # Convert Exa response to standard format with enhanced metadata results = [] if exa_response and hasattr(exa_response, "results"): for result in exa_response.results: # Enhanced result processing with financial relevance scoring financial_relevance = self._calculate_financial_relevance( result ) results.append( { "url": result.url or "", "title": result.title or "No Title", "content": (result.text or "")[:2000], "raw_content": (result.text or "")[ :5000 ], # Increased for financial content "published_date": result.published_date or "", "score": result.score if hasattr(result, "score") and result.score is not None else 0.7, "financial_relevance": financial_relevance, "provider": "exa", "author": result.author if hasattr(result, "author") and result.author is not None else "", "domain": self._extract_domain(result.url or ""), "is_authoritative": self._is_authoritative_source( result.url or "" ), } ) # Sort results by financial relevance and score results.sort( key=lambda x: (x["financial_relevance"], x["score"]), reverse=True, ) return results except ImportError: logger.error("exa-py library not available - cannot perform search") raise WebSearchError( "exa-py library required for ExaSearchProvider" ) except Exception as e: logger.error(f"Error calling Exa API: {e}") raise e # Use adaptive timeout based on query complexity and budget result = await asyncio.wait_for( circuit_breaker.call(_search), timeout=search_timeout ) self._record_success() # Record successful search logger.debug( f"Exa search completed in {search_timeout:.1f}s timeout window" ) return result except TimeoutError: self._record_failure("timeout") # Record timeout as specific failure type query_snippet = query[:100] + ("..." if len(query) > 100 else "") logger.error( f"Exa search timeout after {search_timeout:.1f} seconds (failure #{self._failure_count}) " f"for query: '{query_snippet}'" ) raise WebSearchError( f"Exa search timed out after {search_timeout:.1f} seconds" ) except Exception as e: self._record_failure("error") # Record non-timeout failure logger.error(f"Exa search error (failure #{self._failure_count}): {e}") raise WebSearchError(f"Exa search failed: {str(e)}") def _get_search_params( self, query: str, num_results: int, strategy: str ) -> dict[str, Any]: """ Generate optimized search parameters based on strategy and query type. Args: query: Search query num_results: Number of results strategy: Search strategy Returns: Dictionary of search parameters for Exa API """ # Base parameters params = { "query": query, "num_results": num_results, "text": {"max_characters": 5000}, # Increased for financial content } # Strategy-specific optimizations if strategy == "authoritative": # Focus on authoritative financial sources # Note: Exa API doesn't allow both include_domains and exclude_domains with content params.update( { "include_domains": self.financial_domains[ :10 ], # Top authoritative sources "type": "auto", # Let Exa decide neural vs keyword "start_published_date": "2020-01-01", # Recent financial data } ) elif strategy == "comprehensive": # Broad search across all financial sources params.update( { "exclude_domains": self.excluded_domains, "type": "neural", # Better for comprehensive understanding "start_published_date": "2018-01-01", # Broader historical context } ) elif strategy == "hybrid": # Balanced approach with domain preferences params.update( { "exclude_domains": self.excluded_domains, "type": "auto", # Hybrid neural/keyword approach "start_published_date": "2019-01-01", # Use domain weighting rather than strict inclusion } ) else: # "auto" or default # Standard search with basic optimizations params.update( { "exclude_domains": self.excluded_domains[:5], # Basic exclusions "type": "auto", } ) # Add financial-specific query enhancements enhanced_query = self._enhance_financial_query(query) if enhanced_query != query: params["query"] = enhanced_query return params def _enhance_financial_query(self, query: str) -> str: """ Enhance queries with financial context and terminology. Args: query: Original search query Returns: Enhanced query with financial context """ # Financial keywords that improve search quality financial_terms = { "earnings", "revenue", "profit", "loss", "financial", "quarterly", "annual", "SEC", "10-K", "10-Q", "balance sheet", "income statement", "cash flow", "dividend", "stock", "share", "market cap", "valuation", } query_lower = query.lower() # Check if query already contains financial terms has_financial_context = any(term in query_lower for term in financial_terms) # Add context for company/stock queries if not has_financial_context: # Detect if it's a company or stock symbol query if any( indicator in query_lower for indicator in ["company", "corp", "inc", "$", "stock"] ): return f"{query} financial analysis earnings revenue" elif len(query.split()) <= 3 and query.isupper(): # Likely stock symbol return f"{query} stock financial performance earnings" elif "analysis" in query_lower or "research" in query_lower: return f"{query} financial data SEC filings" return query def _calculate_financial_relevance(self, result) -> float: """ Calculate financial relevance score for a search result. Args: result: Exa search result object Returns: Financial relevance score (0.0 to 1.0) """ score = 0.0 # Domain-based scoring domain = self._extract_domain(result.url) if domain in self.financial_domains: if domain in ["sec.gov", "edgar.sec.gov", "federalreserve.gov"]: score += 0.4 # Highest authority elif domain in ["bloomberg.com", "reuters.com", "wsj.com", "ft.com"]: score += 0.3 # High-quality financial news else: score += 0.2 # Other financial sources # Content-based scoring if hasattr(result, "text") and result.text: text_lower = result.text.lower() # Financial terminology scoring financial_keywords = [ "earnings", "revenue", "profit", "financial", "quarterly", "annual", "sec filing", "10-k", "10-q", "balance sheet", "income statement", "cash flow", "dividend", "market cap", "valuation", "analyst", "forecast", "guidance", "ebitda", "eps", "pe ratio", ] keyword_matches = sum( 1 for keyword in financial_keywords if keyword in text_lower ) score += min(keyword_matches * 0.05, 0.3) # Max 0.3 from keywords # Title-based scoring if hasattr(result, "title") and result.title: title_lower = result.title.lower() if any( term in title_lower for term in ["financial", "earnings", "quarterly", "annual", "sec"] ): score += 0.1 # Recency bonus for financial data if hasattr(result, "published_date") and result.published_date: try: from datetime import datetime # Handle different date formats date_str = str(result.published_date) if date_str and date_str != "": # Handle ISO format with Z if date_str.endswith("Z"): date_str = date_str.replace("Z", "+00:00") pub_date = datetime.fromisoformat(date_str) days_old = (datetime.now(UTC) - pub_date).days if days_old <= 30: score += 0.1 # Recent data bonus elif days_old <= 90: score += 0.05 # Somewhat recent bonus except (ValueError, AttributeError, TypeError): pass # Skip if date parsing fails return min(score, 1.0) # Cap at 1.0 def _extract_domain(self, url: str) -> str: """Extract domain from URL.""" try: from urllib.parse import urlparse return urlparse(url).netloc.lower().replace("www.", "") except Exception: return "" def _is_authoritative_source(self, url: str) -> bool: """Check if URL is from an authoritative financial source.""" domain = self._extract_domain(url) authoritative_domains = [ "sec.gov", "edgar.sec.gov", "federalreserve.gov", "treasury.gov", "bloomberg.com", "reuters.com", "wsj.com", "ft.com", ] return domain in authoritative_domains class TavilySearchProvider(WebSearchProvider): """Tavily search provider with sensible filtering for financial research.""" def __init__(self, api_key: str): super().__init__(api_key) self.excluded_domains = { "facebook.com", "twitter.com", "x.com", "instagram.com", "reddit.com", } async def search( self, query: str, num_results: int = 10, timeout_budget: float | None = None ) -> list[dict[str, Any]]: if not self.is_healthy(): raise WebSearchError("Tavily provider disabled due to repeated failures") timeout = self._calculate_timeout(query, timeout_budget) circuit_breaker = await circuit_manager.get_or_create( "tavily_search", failure_threshold=8, recovery_timeout=30, ) async def _search() -> list[dict[str, Any]]: if TavilyClient is None: raise ImportError("tavily package is required for TavilySearchProvider") client = TavilyClient(api_key=self.api_key) response = await asyncio.get_event_loop().run_in_executor( None, lambda: client.search(query=query, max_results=num_results), ) return self._process_results(response.get("results", [])) return await circuit_breaker.call(_search, timeout=timeout) def _process_results( self, results: Iterable[dict[str, Any]] ) -> list[dict[str, Any]]: processed: list[dict[str, Any]] = [] for item in results: url = item.get("url", "") if any(domain in url for domain in self.excluded_domains): continue processed.append( { "url": url, "title": item.get("title"), "content": item.get("content") or item.get("raw_content", ""), "raw_content": item.get("raw_content"), "published_date": item.get("published_date"), "score": item.get("score", 0.0), "provider": "tavily", } ) return processed class ContentAnalyzer: """AI-powered content analysis for research results with batch processing capability.""" def __init__(self, llm: BaseChatModel): self.llm = llm self._batch_size = 4 # Process up to 4 sources concurrently @staticmethod def _coerce_message_content(raw_content: Any) -> str: """Convert LLM response content to a string for JSON parsing.""" if isinstance(raw_content, str): return raw_content if isinstance(raw_content, list): parts: list[str] = [] for item in raw_content: if isinstance(item, dict): text_value = item.get("text") if isinstance(text_value, str): parts.append(text_value) else: parts.append(str(text_value)) else: parts.append(str(item)) return "".join(parts) return str(raw_content) async def analyze_content( self, content: str, persona: str, analysis_focus: str = "general" ) -> dict[str, Any]: """Analyze content with AI for insights, sentiment, and relevance.""" persona_focus = PERSONA_RESEARCH_FOCUS.get( persona, PERSONA_RESEARCH_FOCUS["moderate"] ) analysis_prompt = f""" Analyze this financial content from the perspective of a {persona} investor. Content to analyze: {content[:3000]} # Limit content length Focus Areas: {", ".join(persona_focus["keywords"])} Risk Focus: {persona_focus["risk_focus"]} Time Horizon: {persona_focus["time_horizon"]} Provide analysis in the following structure: 1. KEY_INSIGHTS: 3-5 bullet points of most important insights 2. SENTIMENT: Overall sentiment (bullish/bearish/neutral) with confidence (0-1) 3. RISK_FACTORS: Key risks identified relevant to {persona} investors 4. OPPORTUNITIES: Investment opportunities or catalysts identified 5. CREDIBILITY: Assessment of source credibility (0-1 score) 6. RELEVANCE: How relevant is this to {persona} investment strategy (0-1 score) 7. SUMMARY: 2-3 sentence summary for {persona} investors Format as JSON with clear structure. """ try: response = await self.llm.ainvoke( [ SystemMessage( content="You are a financial content analyst. Return only valid JSON." ), HumanMessage(content=analysis_prompt), ] ) raw_content = self._coerce_message_content(response.content).strip() analysis = json.loads(raw_content) return { "insights": analysis.get("KEY_INSIGHTS", []), "sentiment": { "direction": analysis.get("SENTIMENT", {}).get( "direction", "neutral" ), "confidence": analysis.get("SENTIMENT", {}).get("confidence", 0.5), }, "risk_factors": analysis.get("RISK_FACTORS", []), "opportunities": analysis.get("OPPORTUNITIES", []), "credibility_score": analysis.get("CREDIBILITY", 0.5), "relevance_score": analysis.get("RELEVANCE", 0.5), "summary": analysis.get("SUMMARY", ""), "analysis_timestamp": datetime.now(), } except Exception as e: logger.warning(f"AI content analysis failed: {e}, using fallback") return self._fallback_analysis(content, persona) def _fallback_analysis(self, content: str, persona: str) -> dict[str, Any]: """Fallback analysis using keyword matching.""" persona_focus = PERSONA_RESEARCH_FOCUS.get( persona, PERSONA_RESEARCH_FOCUS["moderate"] ) content_lower = content.lower() # Simple sentiment analysis positive_words = [ "growth", "increase", "profit", "success", "opportunity", "strong", ] negative_words = ["decline", "loss", "risk", "problem", "concern", "weak"] positive_count = sum(1 for word in positive_words if word in content_lower) negative_count = sum(1 for word in negative_words if word in content_lower) if positive_count > negative_count: sentiment = "bullish" confidence = 0.6 elif negative_count > positive_count: sentiment = "bearish" confidence = 0.6 else: sentiment = "neutral" confidence = 0.5 # Relevance scoring based on keywords keyword_matches = sum( 1 for keyword in persona_focus["keywords"] if keyword in content_lower ) relevance_score = min(keyword_matches / len(persona_focus["keywords"]), 1.0) return { "insights": [f"Fallback analysis for {persona} investor perspective"], "sentiment": {"direction": sentiment, "confidence": confidence}, "risk_factors": ["Unable to perform detailed risk analysis"], "opportunities": ["Unable to identify specific opportunities"], "credibility_score": 0.5, "relevance_score": relevance_score, "summary": f"Content analysis for {persona} investor using fallback method", "analysis_timestamp": datetime.now(), "fallback_used": True, } async def analyze_content_batch( self, content_items: list[tuple[str, str]], persona: str, analysis_focus: str = "general", ) -> list[dict[str, Any]]: """ Analyze multiple content items in parallel batches for improved performance. Args: content_items: List of (content, source_identifier) tuples persona: Investor persona for analysis perspective analysis_focus: Focus area for analysis Returns: List of analysis results in same order as input """ if not content_items: return [] # Process items in batches to avoid overwhelming the LLM results = [] for i in range(0, len(content_items), self._batch_size): batch = content_items[i : i + self._batch_size] # Create concurrent tasks for this batch tasks = [ self.analyze_content(content, persona, analysis_focus) for content, _ in batch ] # Wait for all tasks in this batch to complete try: batch_results = await asyncio.gather(*tasks, return_exceptions=True) # Process results and handle exceptions for j, result in enumerate(batch_results): if isinstance(result, Exception): logger.warning( f"Batch analysis failed for item {i + j}: {result}" ) # Use fallback for failed items content, source_id = batch[j] fallback_result = self._fallback_analysis(content, persona) fallback_result["source_identifier"] = source_id fallback_result["batch_processed"] = True results.append(fallback_result) elif isinstance(result, dict): enriched_result = dict(result) enriched_result["source_identifier"] = batch[j][1] enriched_result["batch_processed"] = True results.append(enriched_result) else: content, source_id = batch[j] fallback_result = self._fallback_analysis(content, persona) fallback_result["source_identifier"] = source_id fallback_result["batch_processed"] = True results.append(fallback_result) except Exception as e: logger.error(f"Batch analysis completely failed: {e}") # Fallback for entire batch for content, source_id in batch: fallback_result = self._fallback_analysis(content, persona) fallback_result["source_identifier"] = source_id fallback_result["batch_processed"] = True fallback_result["batch_error"] = str(e) results.append(fallback_result) logger.info( f"Batch content analysis completed: {len(content_items)} items processed " f"in {(len(content_items) + self._batch_size - 1) // self._batch_size} batches" ) return results async def analyze_content_items( self, content_items: list[dict[str, Any]], focus_areas: list[str], ) -> dict[str, Any]: """ Analyze content items for test compatibility. Args: content_items: List of search result dictionaries with content/text field focus_areas: List of focus areas for analysis Returns: Dictionary with aggregated analysis results """ if not content_items: return { "insights": [], "sentiment_scores": [], "credibility_scores": [], } # For test compatibility, directly use LLM with test-compatible format analyzed_results = [] for item in content_items: content = item.get("text") or item.get("content") or "" if content: try: # Direct LLM call for test compatibility prompt = f"Analyze: {content[:500]}" response = await self.llm.ainvoke( [ SystemMessage( content="You are a financial content analyst. Return only valid JSON." ), HumanMessage(content=prompt), ] ) coerced_content = self._coerce_message_content( response.content ).strip() analysis = json.loads(coerced_content) analyzed_results.append(analysis) except Exception as e: logger.warning(f"Content analysis failed: {e}") # Add fallback analysis analyzed_results.append( { "insights": [ {"insight": "Analysis failed", "confidence": 0.1} ], "sentiment": {"direction": "neutral", "confidence": 0.5}, "credibility": 0.5, } ) # Aggregate results all_insights = [] sentiment_scores = [] credibility_scores = [] for result in analyzed_results: # Handle test format with nested insight objects insights = result.get("insights", []) if isinstance(insights, list): for insight in insights: if isinstance(insight, dict) and "insight" in insight: all_insights.append(insight["insight"]) elif isinstance(insight, str): all_insights.append(insight) else: all_insights.append(str(insight)) sentiment = result.get("sentiment", {}) if sentiment: sentiment_scores.append(sentiment) credibility = result.get( "credibility_score", result.get("credibility", 0.5) ) credibility_scores.append(credibility) return { "insights": all_insights, "sentiment_scores": sentiment_scores, "credibility_scores": credibility_scores, } async def _analyze_single_content( self, content_item: dict[str, Any] | str, focus_areas: list[str] | None = None ) -> dict[str, Any]: """Analyze single content item - used by tests.""" if isinstance(content_item, dict): content = content_item.get("text") or content_item.get("content") or "" else: content = content_item try: result = await self.analyze_content(content, "moderate") # Ensure test-compatible format if "credibility_score" in result and "credibility" not in result: result["credibility"] = result["credibility_score"] return result except Exception as e: logger.warning(f"Single content analysis failed: {e}") # Return fallback result return { "sentiment": {"direction": "neutral", "confidence": 0.5}, "credibility": 0.5, "credibility_score": 0.5, "insights": [], "risk_factors": [], "opportunities": [], } async def _extract_themes( self, content_items: list[dict[str, Any]] ) -> list[dict[str, Any]]: """Extract themes from content items - used by tests.""" if not content_items: return [] # Use LLM to extract structured themes try: content_text = "\n".join( [item.get("text", item.get("content", "")) for item in content_items] ) prompt = f""" Extract key themes from the following content and return as JSON: {content_text[:2000]} Return format: {{"themes": [{{"theme": "theme_name", "relevance": 0.9, "mentions": 10}}]}} """ response = await self.llm.ainvoke( [ SystemMessage( content="You are a theme extraction AI. Return only valid JSON." ), HumanMessage(content=prompt), ] ) result = json.loads( ContentAnalyzer._coerce_message_content(response.content) ) return result.get("themes", []) except Exception as e: logger.warning(f"Theme extraction failed: {e}") # Fallback to simple keyword-based themes themes = [] for item in content_items: content = item.get("text") or item.get("content") or "" if content: content_lower = content.lower() if "growth" in content_lower: themes.append( {"theme": "Growth", "relevance": 0.8, "mentions": 1} ) if "earnings" in content_lower: themes.append( {"theme": "Earnings", "relevance": 0.7, "mentions": 1} ) if "technology" in content_lower: themes.append( {"theme": "Technology", "relevance": 0.6, "mentions": 1} ) return themes class DeepResearchAgent(PersonaAwareAgent): """ Deep research agent using 2025 LangGraph patterns. Provides comprehensive financial research with web search, content analysis, sentiment detection, and source validation. """ def __init__( self, llm: BaseChatModel, persona: str = "moderate", checkpointer: MemorySaver | None = None, ttl_hours: int = 24, # Research results cached longer exa_api_key: str | None = None, default_depth: str = "standard", max_sources: int | None = None, research_depth: str | None = None, enable_parallel_execution: bool = True, parallel_config=None, # Type: ParallelResearchConfig | None ): """Initialize deep research agent.""" # Import here to avoid circular dependency from maverick_mcp.utils.parallel_research import ( ParallelResearchConfig, ParallelResearchOrchestrator, TaskDistributionEngine, ) # Store API key for immediate loading of search provider (pre-initialization) self._exa_api_key = exa_api_key self._search_providers_loaded = False self.search_providers = [] # Pre-initialize search providers immediately (async init will be called separately) self._initialization_pending = True # Configuration self.default_depth = research_depth or default_depth self.max_sources = max_sources or RESEARCH_DEPTH_LEVELS.get( self.default_depth, {} ).get("max_sources", 10) self.content_analyzer = ContentAnalyzer(llm) # Parallel execution configuration self.enable_parallel_execution = enable_parallel_execution self.parallel_config = parallel_config or ParallelResearchConfig( max_concurrent_agents=settings.data_limits.max_parallel_agents, timeout_per_agent=180, # 3 minutes per agent for thorough research enable_fallbacks=False, # Disable fallbacks for speed rate_limit_delay=0.5, # Reduced delay for faster execution ) self.parallel_orchestrator = ParallelResearchOrchestrator(self.parallel_config) self.task_distributor = TaskDistributionEngine() # Get research-specific tools research_tools = self._get_research_tools() # Initialize base class super().__init__( llm=llm, tools=research_tools, persona=persona, checkpointer=checkpointer or MemorySaver(), ttl_hours=ttl_hours, ) # Initialize components self.conversation_store = ConversationStore(ttl_hours=ttl_hours) @property def web_search_provider(self): """Compatibility property for tests - returns first search provider.""" return self.search_providers[0] if self.search_providers else None def _is_insight_relevant_for_persona( self, insight: dict[str, Any], characteristics: dict[str, Any] ) -> bool: """Check if an insight is relevant for a given persona - used by tests.""" # Simple implementation for test compatibility # In a real implementation, this would analyze the insight against persona characteristics return True # Default permissive approach as mentioned in test comments async def initialize(self) -> None: """Pre-initialize Exa search provider to eliminate lazy loading overhead during research.""" if not self._initialization_pending: return try: provider = await get_cached_search_provider(self._exa_api_key) self.search_providers = [provider] if provider else [] self._search_providers_loaded = True self._initialization_pending = False if not self.search_providers: logger.warning( "Exa search provider not available - research capabilities will be limited" ) else: logger.info("Pre-initialized Exa search provider") except Exception as e: logger.error(f"Failed to pre-initialize Exa search provider: {e}") self.search_providers = [] self._search_providers_loaded = True self._initialization_pending = False logger.info( f"DeepResearchAgent pre-initialized with {len(self.search_providers)} search providers, " f"parallel execution: {self.enable_parallel_execution}" ) async def _ensure_search_providers_loaded(self) -> None: """Ensure search providers are loaded - fallback to initialization if not pre-initialized.""" if self._search_providers_loaded: return # Check if initialization was marked as needed if hasattr(self, "_needs_initialization") and self._needs_initialization: logger.info("Performing deferred initialization of search providers") await self.initialize() self._needs_initialization = False else: # Fallback to pre-initialization if not done during agent creation logger.warning( "Search providers not pre-initialized - falling back to lazy loading" ) await self.initialize() def get_state_schema(self) -> type: """Return DeepResearchState schema.""" return DeepResearchState def _get_research_tools(self) -> list[BaseTool]: """Get tools specific to research capabilities.""" tools = [] @tool async def web_search_financial( query: str, num_results: int = 10, provider: str = "auto", strategy: str = "hybrid", ) -> dict[str, Any]: """ Search the web for financial information using optimized providers and strategies. Args: query: Search query for financial information num_results: Number of results to return (default: 10) provider: Search provider to use ('auto', 'exa', 'tavily') strategy: Search strategy ('hybrid', 'authoritative', 'comprehensive', 'auto') """ return await self._perform_financial_search( query, num_results, provider, strategy ) @tool async def analyze_company_fundamentals( symbol: str, depth: str = "standard" ) -> dict[str, Any]: """Research company fundamentals including financials, competitive position, and outlook.""" return await self._research_company_fundamentals(symbol, depth) @tool async def analyze_market_sentiment( topic: str, timeframe: str = "7d" ) -> dict[str, Any]: """Analyze market sentiment around a topic using news and social signals.""" return await self._analyze_market_sentiment_tool(topic, timeframe) @tool async def validate_research_claims( claims: list[str], sources: list[str] ) -> dict[str, Any]: """Validate research claims against multiple sources for fact-checking.""" return await self._validate_claims(claims, sources) tools.extend( [ web_search_financial, analyze_company_fundamentals, analyze_market_sentiment, validate_research_claims, ] ) return tools async def _perform_web_search( self, query: str, num_results: int, provider: str = "auto" ) -> dict[str, Any]: """Fallback web search across configured providers.""" await self._ensure_search_providers_loaded() if not self.search_providers: return { "error": "No search providers available", "results": [], "total_results": 0, } aggregated_results: list[dict[str, Any]] = [] target = provider.lower() for provider_obj in self.search_providers: provider_name = provider_obj.__class__.__name__.lower() if target != "auto" and target not in provider_name: continue try: results = await provider_obj.search(query, num_results) aggregated_results.extend(results) if target != "auto": break except Exception as error: # pragma: no cover - fallback logging logger.warning( "Fallback web search failed for provider %s: %s", provider_obj.__class__.__name__, error, ) if not aggregated_results: return { "error": "Search failed", "results": [], "total_results": 0, } truncated_results = aggregated_results[:num_results] return { "results": truncated_results, "total_results": len(truncated_results), "search_duration": 0.0, "search_strategy": "fallback", } async def _research_company_fundamentals( self, symbol: str, depth: str = "standard" ) -> dict[str, Any]: """Convenience wrapper for company fundamental research used by tools.""" session_id = f"fundamentals-{symbol}-{uuid4().hex}" focus_areas = [ "fundamentals", "financials", "valuation", "risk_management", "growth_drivers", ] return await self.research_comprehensive( topic=f"{symbol} company fundamentals analysis", session_id=session_id, depth=depth, focus_areas=focus_areas, timeframe="180d", use_parallel_execution=False, ) async def _analyze_market_sentiment_tool( self, topic: str, timeframe: str = "7d" ) -> dict[str, Any]: """Wrapper used by the sentiment analysis tool.""" session_id = f"sentiment-{uuid4().hex}" return await self.analyze_market_sentiment( topic=topic, session_id=session_id, timeframe=timeframe, use_parallel_execution=False, ) async def _validate_claims( self, claims: list[str], sources: list[str] ) -> dict[str, Any]: """Lightweight claim validation used for tool compatibility.""" validation_results: list[dict[str, Any]] = [] for claim in claims: source_checks = [] for source in sources: source_checks.append( { "source": source, "status": "not_verified", "confidence": 0.0, "notes": "Automatic validation not available in fallback mode", } ) validation_results.append( { "claim": claim, "validated": False, "confidence": 0.0, "evidence": [], "source_checks": source_checks, } ) return { "results": validation_results, "summary": "Claim validation is currently using fallback heuristics.", } async def _perform_financial_search( self, query: str, num_results: int, provider: str, strategy: str ) -> dict[str, Any]: """ Perform optimized financial search with enhanced strategies. Args: query: Search query num_results: Number of results provider: Search provider preference strategy: Search strategy Returns: Dictionary with search results and metadata """ if not self.search_providers: return { "error": "No search providers available", "results": [], "total_results": 0, } start_time = datetime.now() all_results = [] # Use Exa provider with financial optimization if available exa_provider = None for p in self.search_providers: if isinstance(p, ExaSearchProvider): exa_provider = p break if exa_provider and (provider == "auto" or provider == "exa"): try: # Use the enhanced financial search method results = await exa_provider.search_financial( query, num_results, strategy=strategy ) # Add search metadata for result in results: result.update( { "search_strategy": strategy, "search_timestamp": start_time.isoformat(), "enhanced_query": query, } ) all_results.extend(results) logger.info( f"Financial search completed: {len(results)} results " f"using strategy '{strategy}' in {(datetime.now() - start_time).total_seconds():.2f}s" ) except Exception as e: logger.error(f"Enhanced financial search failed: {e}") # Fallback to regular search if available if hasattr(self, "_perform_web_search"): return await self._perform_web_search(query, num_results, provider) else: return { "error": f"Financial search failed: {str(e)}", "results": [], "total_results": 0, } else: # Use regular search providers try: for provider_obj in self.search_providers: if ( provider == "auto" or provider.lower() in str(type(provider_obj)).lower() ): results = await provider_obj.search(query, num_results) all_results.extend(results) break except Exception as e: logger.error(f"Fallback search failed: {e}") return { "error": f"Search failed: {str(e)}", "results": [], "total_results": 0, } # Sort by financial relevance and authority all_results.sort( key=lambda x: ( x.get("financial_relevance", 0), x.get("is_authoritative", False), x.get("score", 0), ), reverse=True, ) return { "results": all_results[:num_results], "total_results": len(all_results), "search_strategy": strategy, "search_duration": (datetime.now() - start_time).total_seconds(), "enhanced_search": True, } def _build_graph(self): """Build research workflow graph with multi-step research process.""" workflow = StateGraph(DeepResearchState) # Core research workflow nodes workflow.add_node("plan_research", self._plan_research) workflow.add_node("execute_searches", self._execute_searches) workflow.add_node("analyze_content", self._analyze_content) workflow.add_node("validate_sources", self._validate_sources) workflow.add_node("synthesize_findings", self._synthesize_findings) workflow.add_node("generate_citations", self._generate_citations) # Specialized research nodes workflow.add_node("sentiment_analysis", self._sentiment_analysis) workflow.add_node("fundamental_analysis", self._fundamental_analysis) workflow.add_node("competitive_analysis", self._competitive_analysis) # Quality control nodes workflow.add_node("fact_validation", self._fact_validation) workflow.add_node("source_credibility", self._source_credibility) # Define workflow edges workflow.add_edge(START, "plan_research") workflow.add_edge("plan_research", "execute_searches") workflow.add_edge("execute_searches", "analyze_content") # Conditional routing based on research type workflow.add_conditional_edges( "analyze_content", self._route_specialized_analysis, { "sentiment": "sentiment_analysis", "fundamental": "fundamental_analysis", "competitive": "competitive_analysis", "validation": "validate_sources", "synthesis": "synthesize_findings", }, ) # Specialized analysis flows workflow.add_edge("sentiment_analysis", "validate_sources") workflow.add_edge("fundamental_analysis", "validate_sources") workflow.add_edge("competitive_analysis", "validate_sources") # Quality control flow workflow.add_edge("validate_sources", "fact_validation") workflow.add_edge("fact_validation", "source_credibility") workflow.add_edge("source_credibility", "synthesize_findings") # Final steps workflow.add_edge("synthesize_findings", "generate_citations") workflow.add_edge("generate_citations", END) return workflow.compile(checkpointer=self.checkpointer) @log_method_call(component="DeepResearchAgent", include_timing=True) async def research_comprehensive( self, topic: str, session_id: str, depth: str | None = None, focus_areas: list[str] | None = None, timeframe: str = "30d", timeout_budget: float | None = None, # Total timeout budget in seconds **kwargs, ) -> dict[str, Any]: """ Comprehensive research on a financial topic. Args: topic: Research topic or company/symbol session_id: Session identifier depth: Research depth (basic/standard/comprehensive/exhaustive) focus_areas: Specific areas to focus on timeframe: Time range for research timeout_budget: Total timeout budget in seconds (enables budget allocation) **kwargs: Additional parameters Returns: Comprehensive research results with analysis and citations """ # Ensure search providers are loaded (cached for performance) await self._ensure_search_providers_loaded() # Check if search providers are available if not self.search_providers: return { "error": "Research functionality unavailable - no search providers configured", "details": "Please configure EXA_API_KEY environment variable to enable research capabilities", "topic": topic, "available_functionality": "Limited to pre-existing data and basic analysis", } start_time = datetime.now() depth = depth or self.default_depth # Calculate timeout budget allocation for generous research timeouts timeout_budgets = {} if timeout_budget and timeout_budget > 0: timeout_budgets = { "search_budget": timeout_budget * 0.50, # 50% for search operations (generous allocation) "analysis_budget": timeout_budget * 0.30, # 30% for content analysis "synthesis_budget": timeout_budget * 0.20, # 20% for result synthesis "total_budget": timeout_budget, "allocation_strategy": "comprehensive_research", } logger.info( f"TIMEOUT_BUDGET_ALLOCATION: total={timeout_budget}s → " f"search={timeout_budgets['search_budget']:.1f}s, " f"analysis={timeout_budgets['analysis_budget']:.1f}s, " f"synthesis={timeout_budgets['synthesis_budget']:.1f}s" ) # Initialize research state initial_state = { "messages": [HumanMessage(content=f"Research: {topic}")], "persona": self.persona.name, "session_id": session_id, "timestamp": datetime.now(), "research_topic": topic, "research_depth": depth, "focus_areas": focus_areas or PERSONA_RESEARCH_FOCUS[self.persona.name.lower()]["keywords"], "timeframe": timeframe, "search_queries": [], "search_results": [], "analyzed_content": [], "validated_sources": [], "research_findings": [], "sentiment_analysis": {}, "source_credibility_scores": {}, "citations": [], "research_status": "planning", "research_confidence": 0.0, "source_diversity_score": 0.0, "fact_validation_results": [], "execution_time_ms": 0.0, "api_calls_made": 0, "cache_hits": 0, "cache_misses": 0, # Timeout budget allocation for intelligent time management "timeout_budgets": timeout_budgets, # Legacy fields "token_count": 0, "error": None, "analyzed_stocks": {}, "key_price_levels": {}, "last_analysis_time": {}, "conversation_context": {}, } # Add additional parameters initial_state.update(kwargs) # Set up orchestration logging orchestration_logger = get_orchestration_logger("DeepResearchAgent") orchestration_logger.set_request_context( session_id=session_id, research_topic=topic[:50], # Truncate for logging research_depth=depth, ) # Check if parallel execution is enabled and requested use_parallel = kwargs.get( "use_parallel_execution", self.enable_parallel_execution ) orchestration_logger.info( "🔍 RESEARCH_START", execution_mode="parallel" if use_parallel else "sequential", focus_areas=focus_areas[:3] if focus_areas else None, timeframe=timeframe, ) if use_parallel: orchestration_logger.info("🚀 PARALLEL_EXECUTION_SELECTED") try: result = await self._execute_parallel_research( topic=topic, session_id=session_id, depth=depth, focus_areas=focus_areas, timeframe=timeframe, initial_state=initial_state, start_time=start_time, **kwargs, ) orchestration_logger.info("✅ PARALLEL_EXECUTION_SUCCESS") return result except Exception as e: orchestration_logger.warning( "⚠️ PARALLEL_FALLBACK_TRIGGERED", error=str(e), fallback_mode="sequential", ) # Fall through to sequential execution # Execute research workflow (sequential) orchestration_logger.info("🔄 SEQUENTIAL_EXECUTION_START") try: result = await self.graph.ainvoke( initial_state, config={ "configurable": { "thread_id": session_id, "checkpoint_ns": "deep_research", } }, ) # Calculate execution time execution_time = (datetime.now() - start_time).total_seconds() * 1000 result["execution_time_ms"] = execution_time return self._format_research_response(result) except Exception as e: logger.error(f"Error in deep research: {e}") return { "status": "error", "error": str(e), "execution_time_ms": (datetime.now() - start_time).total_seconds() * 1000, "agent_type": "deep_research", } # Workflow node implementations async def _plan_research(self, state: DeepResearchState) -> Command: """Plan research strategy based on topic and persona.""" topic = state["research_topic"] depth_config = RESEARCH_DEPTH_LEVELS[state["research_depth"]] persona_focus = PERSONA_RESEARCH_FOCUS[self.persona.name.lower()] # Generate search queries based on topic and persona search_queries = await self._generate_search_queries( topic, persona_focus, depth_config ) return Command( goto="execute_searches", update={"search_queries": search_queries, "research_status": "searching"}, ) async def _safe_search( self, provider: WebSearchProvider, query: str, num_results: int = 5, timeout_budget: float | None = None, ) -> list[dict[str, Any]]: """Safely execute search with a provider, handling exceptions gracefully.""" try: return await provider.search( query, num_results=num_results, timeout_budget=timeout_budget ) except Exception as e: logger.warning( f"Search failed for '{query}' with provider {type(provider).__name__}: {e}" ) return [] # Return empty list on failure async def _execute_searches(self, state: DeepResearchState) -> Command: """Execute web searches using available providers with timeout budget awareness.""" search_queries = state["search_queries"] depth_config = RESEARCH_DEPTH_LEVELS[state["research_depth"]] # Calculate timeout budget per search operation timeout_budgets = state.get("timeout_budgets", {}) search_budget = timeout_budgets.get("search_budget") if search_budget: # Divide search budget across queries and providers total_search_operations = len( search_queries[: depth_config["max_searches"]] ) * len(self.search_providers) timeout_per_search = ( search_budget / max(total_search_operations, 1) if total_search_operations > 0 else search_budget ) logger.info( f"SEARCH_BUDGET_ALLOCATION: {search_budget:.1f}s total → " f"{timeout_per_search:.1f}s per search ({total_search_operations} operations)" ) else: timeout_per_search = None all_results = [] # Create all search tasks for parallel execution with budget-aware timeouts search_tasks = [] for query in search_queries[: depth_config["max_searches"]]: for provider in self.search_providers: # Create async task for each provider/query combination with timeout budget search_tasks.append( self._safe_search( provider, query, num_results=5, timeout_budget=timeout_per_search, ) ) # Execute all searches in parallel using asyncio.gather() if search_tasks: parallel_results = await asyncio.gather( *search_tasks, return_exceptions=True ) # Process results and filter out exceptions for result in parallel_results: if isinstance(result, Exception): # Log the exception but continue with other results logger.warning(f"Search task failed: {result}") elif isinstance(result, list): all_results.extend(result) elif result is not None: all_results.append(result) # Deduplicate and limit results unique_results = [] seen_urls = set() for result in all_results: if ( result["url"] not in seen_urls and len(unique_results) < depth_config["max_sources"] ): unique_results.append(result) seen_urls.add(result["url"]) logger.info( f"Search completed: {len(unique_results)} unique results from {len(all_results)} total" ) return Command( goto="analyze_content", update={"search_results": unique_results, "research_status": "analyzing"}, ) async def _analyze_content(self, state: DeepResearchState) -> Command: """Analyze search results using AI content analysis.""" search_results = state["search_results"] analyzed_content = [] # Analyze each piece of content for result in search_results: if result.get("content"): analysis = await self.content_analyzer.analyze_content( content=result["content"], persona=self.persona.name.lower(), analysis_focus=state["research_depth"], ) analyzed_content.append({**result, "analysis": analysis}) return Command( goto="validate_sources", update={ "analyzed_content": analyzed_content, "research_status": "validating", }, ) def _route_specialized_analysis(self, state: DeepResearchState) -> str: """Route to specialized analysis based on research focus.""" focus_areas = state.get("focus_areas", []) if any(word in focus_areas for word in ["sentiment", "news", "social"]): return "sentiment" elif any( word in focus_areas for word in ["fundamental", "financial", "earnings"] ): return "fundamental" elif any(word in focus_areas for word in ["competitive", "market", "industry"]): return "competitive" else: return "validation" async def _validate_sources(self, state: DeepResearchState) -> Command: """Validate source credibility and filter results.""" analyzed_content = state["analyzed_content"] validated_sources = [] credibility_scores = {} for content in analyzed_content: # Calculate credibility score based on multiple factors credibility_score = self._calculate_source_credibility(content) credibility_scores[content["url"]] = credibility_score # Only include sources above credibility threshold if credibility_score >= 0.6: # Configurable threshold validated_sources.append(content) return Command( goto="synthesize_findings", update={ "validated_sources": validated_sources, "source_credibility_scores": credibility_scores, "research_status": "synthesizing", }, ) async def _synthesize_findings(self, state: DeepResearchState) -> Command: """Synthesize research findings into coherent insights.""" validated_sources = state["validated_sources"] # Generate synthesis using LLM synthesis_prompt = self._build_synthesis_prompt(validated_sources, state) synthesis_response = await self.llm.ainvoke( [ SystemMessage(content="You are a financial research synthesizer."), HumanMessage(content=synthesis_prompt), ] ) raw_synthesis = ContentAnalyzer._coerce_message_content( synthesis_response.content ) research_findings = { "synthesis": raw_synthesis, "key_insights": self._extract_key_insights(validated_sources), "overall_sentiment": self._calculate_overall_sentiment(validated_sources), "risk_assessment": self._assess_risks(validated_sources), "investment_implications": self._derive_investment_implications( validated_sources ), "confidence_score": self._calculate_research_confidence(validated_sources), } return Command( goto="generate_citations", update={ "research_findings": research_findings, "research_confidence": research_findings["confidence_score"], "research_status": "completing", }, ) async def _generate_citations(self, state: DeepResearchState) -> Command: """Generate proper citations for all sources.""" validated_sources = state["validated_sources"] citations = [] for i, source in enumerate(validated_sources, 1): citation = { "id": i, "title": source.get("title", "Untitled"), "url": source["url"], "published_date": source.get("published_date"), "author": source.get("author"), "credibility_score": state["source_credibility_scores"].get( source["url"], 0.5 ), "relevance_score": source.get("analysis", {}).get( "relevance_score", 0.5 ), } citations.append(citation) return Command( goto="__end__", update={"citations": citations, "research_status": "completed"}, ) # Helper methods async def _generate_search_queries( self, topic: str, persona_focus: dict[str, Any], depth_config: dict[str, Any] ) -> list[str]: """Generate search queries optimized for the research topic and persona.""" base_queries = [ f"{topic} financial analysis", f"{topic} investment research", f"{topic} market outlook", ] # Add persona-specific queries persona_queries = [ f"{topic} {keyword}" for keyword in persona_focus["keywords"][:3] ] # Add source-specific queries source_queries = [ f"{topic} {source}" for source in persona_focus["sources"][:2] ] all_queries = base_queries + persona_queries + source_queries return all_queries[: depth_config["max_searches"]] def _calculate_source_credibility(self, content: dict[str, Any]) -> float: """Calculate credibility score for a source.""" score = 0.5 # Base score # Domain credibility url = content.get("url", "") if any(domain in url for domain in [".gov", ".edu", ".org"]): score += 0.2 elif any( domain in url for domain in [ "sec.gov", "investopedia.com", "bloomberg.com", "reuters.com", ] ): score += 0.3 # Publication date recency pub_date = content.get("published_date") if pub_date: try: date_obj = datetime.fromisoformat(pub_date.replace("Z", "+00:00")) days_old = (datetime.now() - date_obj).days if days_old < 30: score += 0.1 elif days_old < 90: score += 0.05 except (ValueError, TypeError, AttributeError): pass # Content analysis credibility if "analysis" in content: analysis_cred = content["analysis"].get("credibility_score", 0.5) score = (score + analysis_cred) / 2 return min(score, 1.0) def _build_synthesis_prompt( self, sources: list[dict[str, Any]], state: DeepResearchState ) -> str: """Build synthesis prompt for final research output.""" topic = state["research_topic"] persona = self.persona.name prompt = f""" Synthesize comprehensive research findings on '{topic}' for a {persona} investor. Research Sources ({len(sources)} validated sources): """ for i, source in enumerate(sources, 1): analysis = source.get("analysis", {}) prompt += f"\n{i}. {source.get('title', 'Unknown Title')}" prompt += f" - Insights: {', '.join(analysis.get('insights', [])[:2])}" prompt += f" - Sentiment: {analysis.get('sentiment', {}).get('direction', 'neutral')}" prompt += f" - Credibility: {state['source_credibility_scores'].get(source['url'], 0.5):.2f}" prompt += f""" Please provide a comprehensive synthesis that includes: 1. Executive Summary (2-3 sentences) 2. Key Findings (5-7 bullet points) 3. Investment Implications for {persona} investors 4. Risk Considerations 5. Recommended Actions 6. Confidence Level and reasoning Tailor the analysis specifically for {persona} investment characteristics and risk tolerance. """ return prompt def _extract_key_insights(self, sources: list[dict[str, Any]]) -> list[str]: """Extract and consolidate key insights from all sources.""" all_insights = [] for source in sources: analysis = source.get("analysis", {}) insights = analysis.get("insights", []) all_insights.extend(insights) # Simple deduplication (could be enhanced with semantic similarity) unique_insights = list(dict.fromkeys(all_insights)) return unique_insights[:10] # Return top 10 insights def _calculate_overall_sentiment( self, sources: list[dict[str, Any]] ) -> dict[str, Any]: """Calculate overall sentiment from all sources.""" sentiments = [] weights = [] for source in sources: analysis = source.get("analysis", {}) sentiment = analysis.get("sentiment", {}) # Convert sentiment to numeric value direction = sentiment.get("direction", "neutral") if direction == "bullish": sentiment_value = 1 elif direction == "bearish": sentiment_value = -1 else: sentiment_value = 0 confidence = sentiment.get("confidence", 0.5) credibility = source.get("credibility_score", 0.5) sentiments.append(sentiment_value) weights.append(confidence * credibility) if not sentiments: return {"direction": "neutral", "confidence": 0.5, "consensus": 0.5} # Weighted average sentiment weighted_sentiment = sum( s * w for s, w in zip(sentiments, weights, strict=False) ) / sum(weights) # Convert back to direction if weighted_sentiment > 0.2: overall_direction = "bullish" elif weighted_sentiment < -0.2: overall_direction = "bearish" else: overall_direction = "neutral" # Calculate consensus (how much sources agree) sentiment_variance = sum(weights) / len(sentiments) if sentiments else 0 consensus = 1 - sentiment_variance if sentiment_variance < 1 else 0 return { "direction": overall_direction, "confidence": abs(weighted_sentiment), "consensus": consensus, "source_count": len(sentiments), } def _assess_risks(self, sources: list[dict[str, Any]]) -> list[str]: """Consolidate risk assessments from all sources.""" all_risks = [] for source in sources: analysis = source.get("analysis", {}) risks = analysis.get("risk_factors", []) all_risks.extend(risks) # Deduplicate and return top risks unique_risks = list(dict.fromkeys(all_risks)) return unique_risks[:8] def _derive_investment_implications( self, sources: list[dict[str, Any]] ) -> dict[str, Any]: """Derive investment implications based on research findings.""" opportunities = [] threats = [] for source in sources: analysis = source.get("analysis", {}) opps = analysis.get("opportunities", []) risks = analysis.get("risk_factors", []) opportunities.extend(opps) threats.extend(risks) return { "opportunities": list(dict.fromkeys(opportunities))[:5], "threats": list(dict.fromkeys(threats))[:5], "recommended_action": self._recommend_action(sources), "time_horizon": PERSONA_RESEARCH_FOCUS[self.persona.name.lower()][ "time_horizon" ], } def _recommend_action(self, sources: list[dict[str, Any]]) -> str: """Recommend investment action based on research findings.""" overall_sentiment = self._calculate_overall_sentiment(sources) if ( overall_sentiment["direction"] == "bullish" and overall_sentiment["confidence"] > 0.7 ): if self.persona.name.lower() == "conservative": return "Consider gradual position building with proper risk management" else: return "Consider initiating position with appropriate position sizing" elif ( overall_sentiment["direction"] == "bearish" and overall_sentiment["confidence"] > 0.7 ): return "Exercise caution - consider waiting for better entry or avoiding" else: return "Monitor closely - mixed signals suggest waiting for clarity" def _calculate_research_confidence(self, sources: list[dict[str, Any]]) -> float: """Calculate overall confidence in research findings.""" if not sources: return 0.0 # Factors that increase confidence source_count_factor = min( len(sources) / 10, 1.0 ) # More sources = higher confidence avg_credibility = sum( source.get("credibility_score", 0.5) for source in sources ) / len(sources) avg_relevance = sum( source.get("analysis", {}).get("relevance_score", 0.5) for source in sources ) / len(sources) # Diversity of sources (different domains) unique_domains = len( {source["url"].split("/")[2] for source in sources if "url" in source} ) diversity_factor = min(unique_domains / 5, 1.0) # Combine factors confidence = ( source_count_factor + avg_credibility + avg_relevance + diversity_factor ) / 4 return round(confidence, 2) def _format_research_response(self, result: dict[str, Any]) -> dict[str, Any]: """Format research response for consistent output.""" return { "status": "success", "agent_type": "deep_research", "persona": result.get("persona"), "research_topic": result.get("research_topic"), "research_depth": result.get("research_depth"), "findings": result.get("research_findings", {}), "sources_analyzed": len(result.get("validated_sources", [])), "confidence_score": result.get("research_confidence", 0.0), "citations": result.get("citations", []), "execution_time_ms": result.get("execution_time_ms", 0.0), "search_queries_used": result.get("search_queries", []), "source_diversity": result.get("source_diversity_score", 0.0), } # Specialized research analysis methods async def _sentiment_analysis(self, state: DeepResearchState) -> Command: """Perform specialized sentiment analysis.""" logger.info("Performing sentiment analysis") # For now, route to content analysis with sentiment focus original_focus = state.get("focus_areas", []) state["focus_areas"] = ["market_sentiment", "sentiment", "mood"] result = await self._analyze_content(state) state["focus_areas"] = original_focus # Restore original focus return result async def _fundamental_analysis(self, state: DeepResearchState) -> Command: """Perform specialized fundamental analysis.""" logger.info("Performing fundamental analysis") # For now, route to content analysis with fundamental focus original_focus = state.get("focus_areas", []) state["focus_areas"] = ["fundamentals", "financials", "valuation"] result = await self._analyze_content(state) state["focus_areas"] = original_focus # Restore original focus return result async def _competitive_analysis(self, state: DeepResearchState) -> Command: """Perform specialized competitive analysis.""" logger.info("Performing competitive analysis") # For now, route to content analysis with competitive focus original_focus = state.get("focus_areas", []) state["focus_areas"] = ["competitive_landscape", "market_share", "competitors"] result = await self._analyze_content(state) state["focus_areas"] = original_focus # Restore original focus return result async def _fact_validation(self, state: DeepResearchState) -> Command: """Perform fact validation on research findings.""" logger.info("Performing fact validation") # For now, route to source validation return await self._validate_sources(state) async def _source_credibility(self, state: DeepResearchState) -> Command: """Assess source credibility and reliability.""" logger.info("Assessing source credibility") # For now, route to source validation return await self._validate_sources(state) async def research_company_comprehensive( self, symbol: str, session_id: str, include_competitive_analysis: bool = False, **kwargs, ) -> dict[str, Any]: """ Comprehensive company research. Args: symbol: Stock symbol to research session_id: Session identifier include_competitive_analysis: Whether to include competitive analysis **kwargs: Additional parameters Returns: Comprehensive company research results """ topic = f"{symbol} company financial analysis and outlook" if include_competitive_analysis: kwargs["focus_areas"] = kwargs.get("focus_areas", []) + [ "competitive_analysis", "market_position", ] return await self.research_comprehensive( topic=topic, session_id=session_id, **kwargs ) async def research_topic( self, query: str, session_id: str, focus_areas: list[str] | None = None, timeframe: str = "30d", **kwargs, ) -> dict[str, Any]: """ General topic research. Args: query: Research query or topic session_id: Session identifier focus_areas: Specific areas to focus on timeframe: Time range for research **kwargs: Additional parameters Returns: Research results for the given topic """ return await self.research_comprehensive( topic=query, session_id=session_id, focus_areas=focus_areas, timeframe=timeframe, **kwargs, ) async def analyze_market_sentiment( self, topic: str, session_id: str, timeframe: str = "7d", **kwargs ) -> dict[str, Any]: """ Analyze market sentiment around a topic. Args: topic: Topic to analyze sentiment for session_id: Session identifier timeframe: Time range for analysis **kwargs: Additional parameters Returns: Market sentiment analysis results """ return await self.research_comprehensive( topic=f"market sentiment analysis: {topic}", session_id=session_id, focus_areas=["sentiment", "market_mood", "investor_sentiment"], timeframe=timeframe, **kwargs, ) # Parallel Execution Implementation @log_method_call(component="DeepResearchAgent", include_timing=True) async def _execute_parallel_research( self, topic: str, session_id: str, depth: str, focus_areas: list[str] | None = None, timeframe: str = "30d", initial_state: dict[str, Any] | None = None, start_time: datetime | None = None, **kwargs, ) -> dict[str, Any]: """ Execute research using parallel subagent execution. Args: topic: Research topic session_id: Session identifier depth: Research depth level focus_areas: Specific focus areas timeframe: Research timeframe initial_state: Initial state for backward compatibility start_time: Start time for execution measurement **kwargs: Additional parameters Returns: Research results in same format as sequential execution """ orchestration_logger = get_orchestration_logger("ParallelExecution") orchestration_logger.set_request_context(session_id=session_id) try: # Generate research tasks using task distributor orchestration_logger.info("🎯 TASK_DISTRIBUTION_START") research_tasks = self.task_distributor.distribute_research_tasks( topic=topic, session_id=session_id, focus_areas=focus_areas ) orchestration_logger.info( "📋 TASKS_GENERATED", task_count=len(research_tasks), task_types=[t.task_type for t in research_tasks], ) # Execute tasks in parallel orchestration_logger.info("🚀 PARALLEL_ORCHESTRATION_START") research_result = ( await self.parallel_orchestrator.execute_parallel_research( tasks=research_tasks, research_executor=self._execute_subagent_task, synthesis_callback=self._synthesize_parallel_results, ) ) # Log parallel execution metrics log_performance_metrics( "ParallelExecution", { "total_tasks": research_result.successful_tasks + research_result.failed_tasks, "successful_tasks": research_result.successful_tasks, "failed_tasks": research_result.failed_tasks, "parallel_efficiency": research_result.parallel_efficiency, "execution_time": research_result.total_execution_time, }, ) # Convert parallel results to expected format orchestration_logger.info("🔄 RESULT_FORMATTING_START") formatted_result = await self._format_parallel_research_response( research_result=research_result, topic=topic, session_id=session_id, depth=depth, initial_state=initial_state, start_time=start_time, ) orchestration_logger.info( "✅ PARALLEL_RESEARCH_COMPLETE", result_confidence=formatted_result.get("confidence_score", 0.0), sources_analyzed=formatted_result.get("sources_analyzed", 0), ) return formatted_result except Exception as e: orchestration_logger.error("❌ PARALLEL_RESEARCH_FAILED", error=str(e)) raise # Re-raise to trigger fallback to sequential async def _execute_subagent_task( self, task ) -> dict[str, Any]: # Type: ResearchTask """ Execute a single research task using specialized subagent. Args: task: ResearchTask to execute Returns: Research results from specialized subagent """ with log_agent_execution( task.task_type, task.task_id, task.focus_areas ) as agent_logger: agent_logger.info( "🎯 SUBAGENT_ROUTING", target_topic=task.target_topic[:50], focus_count=len(task.focus_areas), priority=task.priority, ) # Route to appropriate subagent based on task type if task.task_type == "fundamental": subagent = FundamentalResearchAgent(self) return await subagent.execute_research(task) elif task.task_type == "technical": subagent = TechnicalResearchAgent(self) return await subagent.execute_research(task) elif task.task_type == "sentiment": subagent = SentimentResearchAgent(self) return await subagent.execute_research(task) elif task.task_type == "competitive": subagent = CompetitiveResearchAgent(self) return await subagent.execute_research(task) else: # Default to fundamental analysis agent_logger.warning("⚠️ UNKNOWN_TASK_TYPE", fallback="fundamental") subagent = FundamentalResearchAgent(self) return await subagent.execute_research(task) async def _synthesize_parallel_results( self, task_results, # Type: dict[str, ResearchTask] ) -> dict[str, Any]: """ Synthesize results from multiple parallel research tasks. Args: task_results: Dictionary of task IDs to ResearchTask objects Returns: Synthesized research insights """ synthesis_logger = get_orchestration_logger("ResultSynthesis") log_synthesis_operation( "parallel_research_synthesis", len(task_results), f"Synthesizing from {len(task_results)} research tasks", ) # Extract successful results successful_results = { task_id: task.result for task_id, task in task_results.items() if task.status == "completed" and task.result } synthesis_logger.info( "📊 SYNTHESIS_INPUT_ANALYSIS", total_tasks=len(task_results), successful_tasks=len(successful_results), failed_tasks=len(task_results) - len(successful_results), ) if not successful_results: synthesis_logger.warning("⚠️ NO_SUCCESSFUL_RESULTS") return { "synthesis": "No research results available for synthesis", "confidence_score": 0.0, } all_insights = [] all_risks = [] all_opportunities = [] sentiment_scores = [] credibility_scores = [] # Aggregate results from all successful tasks for task_id, task in task_results.items(): if task.status == "completed" and task.result: task_type = task_id.split("_")[-1] if "_" in task_id else "unknown" synthesis_logger.debug( "🔍 PROCESSING_TASK_RESULT", task_id=task_id, task_type=task_type, has_insights="insights" in task.result if isinstance(task.result, dict) else False, ) result = task.result # Extract insights insights = result.get("insights", []) all_insights.extend(insights) # Extract risks and opportunities risks = result.get("risk_factors", []) opportunities = result.get("opportunities", []) all_risks.extend(risks) all_opportunities.extend(opportunities) # Extract sentiment sentiment = result.get("sentiment", {}) if sentiment: sentiment_scores.append(sentiment) # Extract credibility credibility = result.get("credibility_score", 0.5) credibility_scores.append(credibility) # Calculate overall metrics overall_sentiment = self._calculate_aggregated_sentiment(sentiment_scores) average_credibility = ( sum(credibility_scores) / len(credibility_scores) if credibility_scores else 0.5 ) # Generate synthesis using LLM synthesis_prompt = self._build_parallel_synthesis_prompt( task_results, all_insights, all_risks, all_opportunities, overall_sentiment ) try: synthesis_response = await self.llm.ainvoke( [ SystemMessage( content="You are a financial research synthesizer. Combine insights from multiple specialized research agents." ), HumanMessage(content=synthesis_prompt), ] ) synthesis_text = ContentAnalyzer._coerce_message_content( synthesis_response.content ) synthesis_logger.info("🧠 LLM_SYNTHESIS_SUCCESS") except Exception as e: synthesis_logger.warning( "⚠️ LLM_SYNTHESIS_FAILED", error=str(e), fallback="text_fallback" ) synthesis_text = self._generate_fallback_synthesis( all_insights, overall_sentiment ) synthesis_result = { "synthesis": synthesis_text, "key_insights": list(dict.fromkeys(all_insights))[ :10 ], # Deduplicate and limit "overall_sentiment": overall_sentiment, "risk_assessment": list(dict.fromkeys(all_risks))[:8], "investment_implications": { "opportunities": list(dict.fromkeys(all_opportunities))[:5], "threats": list(dict.fromkeys(all_risks))[:5], "recommended_action": self._derive_parallel_recommendation( overall_sentiment ), }, "confidence_score": average_credibility, "task_breakdown": { task_id: { "type": task.task_type, "status": task.status, "execution_time": (task.end_time - task.start_time) if task.start_time and task.end_time else 0, } for task_id, task in task_results.items() }, } synthesis_logger.info( "✅ SYNTHESIS_COMPLETE", insights_count=len(all_insights), overall_confidence=average_credibility, sentiment_direction=synthesis_result["overall_sentiment"]["direction"], ) return synthesis_result async def _format_parallel_research_response( self, research_result, topic: str, session_id: str, depth: str, initial_state: dict[str, Any] | None, start_time: datetime | None, ) -> dict[str, Any]: """Format parallel research results to match expected sequential format.""" if start_time is not None: execution_time = (datetime.now() - start_time).total_seconds() * 1000 else: execution_time = 0.0 # Extract synthesis from research result synthesis = research_result.synthesis or {} state_snapshot: dict[str, Any] = initial_state or {} # Create citations from task results citations = [] all_sources = [] citation_id = 1 for _task_id, task in research_result.task_results.items(): if task.status == "completed" and task.result: sources = task.result.get("sources", []) for source in sources: citation = { "id": citation_id, "title": source.get("title", "Unknown Title"), "url": source.get("url", ""), "published_date": source.get("published_date"), "author": source.get("author"), "credibility_score": source.get("credibility_score", 0.5), "relevance_score": source.get("relevance_score", 0.5), "research_type": task.task_type, } citations.append(citation) all_sources.append(source) citation_id += 1 return { "status": "success", "agent_type": "deep_research", "execution_mode": "parallel", "persona": state_snapshot.get("persona"), "research_topic": topic, "research_depth": depth, "findings": synthesis, "sources_analyzed": len(all_sources), "confidence_score": synthesis.get("confidence_score", 0.0), "citations": citations, "execution_time_ms": execution_time, "parallel_execution_stats": { "total_tasks": len(research_result.task_results), "successful_tasks": research_result.successful_tasks, "failed_tasks": research_result.failed_tasks, "parallel_efficiency": research_result.parallel_efficiency, "task_breakdown": synthesis.get("task_breakdown", {}), }, "search_queries_used": [], # Will be populated by subagents "source_diversity": len({source.get("url", "") for source in all_sources}) / max(len(all_sources), 1), } # Helper methods for parallel execution def _calculate_aggregated_sentiment( self, sentiment_scores: list[dict[str, Any]] ) -> dict[str, Any]: """Calculate overall sentiment from multiple sentiment scores.""" if not sentiment_scores: return {"direction": "neutral", "confidence": 0.5} # Convert sentiment directions to numeric values numeric_scores = [] confidences = [] for sentiment in sentiment_scores: direction = sentiment.get("direction", "neutral") confidence = sentiment.get("confidence", 0.5) if direction == "bullish": numeric_scores.append(1 * confidence) elif direction == "bearish": numeric_scores.append(-1 * confidence) else: numeric_scores.append(0) confidences.append(confidence) # Calculate weighted average avg_score = sum(numeric_scores) / len(numeric_scores) avg_confidence = sum(confidences) / len(confidences) # Convert back to direction if avg_score > 0.2: direction = "bullish" elif avg_score < -0.2: direction = "bearish" else: direction = "neutral" return { "direction": direction, "confidence": avg_confidence, "consensus": 1 - abs(avg_score) if abs(avg_score) < 1 else 0, "source_count": len(sentiment_scores), } def _build_parallel_synthesis_prompt( self, task_results: dict[str, Any], # Actually dict[str, ResearchTask] all_insights: list[str], all_risks: list[str], all_opportunities: list[str], overall_sentiment: dict[str, Any], ) -> str: """Build synthesis prompt for parallel research results.""" successful_tasks = [ task for task in task_results.values() if task.status == "completed" ] prompt = f""" Synthesize comprehensive research findings from {len(successful_tasks)} specialized research agents. Research Task Results: """ for task in successful_tasks: if task.result: prompt += f"\n{task.task_type.upper()} RESEARCH:" prompt += f" - Status: {task.status}" prompt += f" - Key Insights: {', '.join(task.result.get('insights', [])[:3])}" prompt += f" - Sentiment: {task.result.get('sentiment', {}).get('direction', 'neutral')}" prompt += f""" AGGREGATED DATA: - Total Insights: {len(all_insights)} - Risk Factors: {len(all_risks)} - Opportunities: {len(all_opportunities)} - Overall Sentiment: {overall_sentiment.get("direction")} (confidence: {overall_sentiment.get("confidence", 0.5):.2f}) Please provide a comprehensive synthesis that includes: 1. Executive Summary (2-3 sentences) 2. Key Findings from all research areas 3. Investment Implications for {self.persona.name} investors 4. Risk Assessment and Mitigation 5. Recommended Actions based on parallel analysis 6. Confidence Level and reasoning Focus on insights that are supported by multiple research agents and highlight any contradictions. """ return prompt def _generate_fallback_synthesis( self, insights: list[str], sentiment: dict[str, Any] ) -> str: """Generate fallback synthesis when LLM synthesis fails.""" return f""" Research synthesis generated from {len(insights)} insights. Overall sentiment: {sentiment.get("direction", "neutral")} with {sentiment.get("confidence", 0.5):.2f} confidence. Key insights identified: {chr(10).join(f"- {insight}" for insight in insights[:5])} This is a fallback synthesis due to LLM processing limitations. """ def _derive_parallel_recommendation(self, sentiment: dict[str, Any]) -> str: """Derive investment recommendation from parallel analysis.""" direction = sentiment.get("direction", "neutral") confidence = sentiment.get("confidence", 0.5) if direction == "bullish" and confidence > 0.7: return "Strong buy signal based on parallel analysis from multiple research angles" elif direction == "bullish" and confidence > 0.5: return "Consider position building with appropriate risk management" elif direction == "bearish" and confidence > 0.7: return "Exercise significant caution - multiple research areas show negative signals" elif direction == "bearish" and confidence > 0.5: return "Monitor closely - mixed to negative signals suggest waiting" else: return "Neutral stance recommended - parallel analysis shows mixed signals" # Specialized Subagent Classes class BaseSubagent: """Base class for specialized research subagents.""" def __init__(self, parent_agent: DeepResearchAgent): self.parent = parent_agent self.llm = parent_agent.llm self.search_providers = parent_agent.search_providers self.content_analyzer = parent_agent.content_analyzer self.persona = parent_agent.persona self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") async def execute_research(self, task) -> dict[str, Any]: # task: ResearchTask """Execute research task - to be implemented by subclasses.""" raise NotImplementedError async def _safe_search( self, provider: WebSearchProvider, query: str, num_results: int = 5, timeout_budget: float | None = None, ) -> list[dict[str, Any]]: """Safely execute search with a provider, handling exceptions gracefully.""" try: return await provider.search( query, num_results=num_results, timeout_budget=timeout_budget ) except Exception as e: self.logger.warning( f"Search failed for '{query}' with provider {type(provider).__name__}: {e}" ) return [] # Return empty list on failure async def _perform_specialized_search( self, topic: str, specialized_queries: list[str], max_results: int = 10, timeout_budget: float | None = None, ) -> list[dict[str, Any]]: """Perform specialized web search for this subagent type.""" all_results = [] # Create all search tasks for parallel execution search_tasks = [] results_per_query = ( max_results // len(specialized_queries) if specialized_queries else max_results ) # Calculate timeout per search if budget provided if timeout_budget: total_searches = len(specialized_queries) * len(self.search_providers) timeout_per_search = timeout_budget / max(total_searches, 1) else: timeout_per_search = None for query in specialized_queries: for provider in self.search_providers: # Create async task for each provider/query combination search_tasks.append( self._safe_search( provider, query, num_results=results_per_query, timeout_budget=timeout_per_search, ) ) # Execute all searches in parallel using asyncio.gather() if search_tasks: parallel_results = await asyncio.gather( *search_tasks, return_exceptions=True ) # Process results and filter out exceptions for result in parallel_results: if isinstance(result, Exception): # Log the exception but continue with other results self.logger.warning(f"Search task failed: {result}") elif isinstance(result, list): all_results.extend(result) elif result is not None: all_results.append(result) # Deduplicate results seen_urls = set() unique_results = [] for result in all_results: if result.get("url") not in seen_urls: seen_urls.add(result["url"]) unique_results.append(result) return unique_results[:max_results] async def _analyze_search_results( self, results: list[dict[str, Any]], analysis_focus: str ) -> list[dict[str, Any]]: """Analyze search results with specialized focus.""" analyzed_results = [] for result in results: if result.get("content"): try: analysis = await self.content_analyzer.analyze_content( content=result["content"], persona=self.persona.name.lower(), analysis_focus=analysis_focus, ) # Add source credibility credibility_score = self._calculate_source_credibility(result) analysis["credibility_score"] = credibility_score analyzed_results.append( { **result, "analysis": analysis, "credibility_score": credibility_score, } ) except Exception as e: self.logger.warning( f"Content analysis failed for {result.get('url', 'unknown')}: {e}" ) return analyzed_results def _calculate_source_credibility(self, source: dict[str, Any]) -> float: """Calculate credibility score for a source - reuse from parent.""" return self.parent._calculate_source_credibility(source) class FundamentalResearchAgent(BaseSubagent): """Specialized agent for fundamental financial analysis.""" async def execute_research(self, task) -> dict[str, Any]: # task: ResearchTask """Execute fundamental analysis research.""" self.logger.info(f"Executing fundamental research for: {task.target_topic}") # Generate fundamental-specific search queries queries = self._generate_fundamental_queries(task.target_topic) # Perform specialized search search_results = await self._perform_specialized_search( topic=task.target_topic, specialized_queries=queries, max_results=8 ) # Analyze results with fundamental focus analyzed_results = await self._analyze_search_results( search_results, analysis_focus="fundamental_analysis" ) # Extract fundamental-specific insights insights = [] risks = [] opportunities = [] sources = [] for result in analyzed_results: analysis = result.get("analysis", {}) insights.extend(analysis.get("insights", [])) risks.extend(analysis.get("risk_factors", [])) opportunities.extend(analysis.get("opportunities", [])) sources.append( { "title": result.get("title", ""), "url": result.get("url", ""), "credibility_score": result.get("credibility_score", 0.5), "published_date": result.get("published_date"), "author": result.get("author"), } ) return { "research_type": "fundamental", "insights": list(dict.fromkeys(insights))[:8], # Deduplicate "risk_factors": list(dict.fromkeys(risks))[:6], "opportunities": list(dict.fromkeys(opportunities))[:6], "sentiment": self._calculate_fundamental_sentiment(analyzed_results), "credibility_score": self._calculate_average_credibility(analyzed_results), "sources": sources, "focus_areas": [ "earnings", "valuation", "financial_health", "growth_prospects", ], } def _generate_fundamental_queries(self, topic: str) -> list[str]: """Generate fundamental analysis specific queries.""" return [ f"{topic} earnings report financial results", f"{topic} revenue growth profit margins", f"{topic} balance sheet debt ratio financial health", f"{topic} valuation PE ratio price earnings", f"{topic} cash flow dividend payout", ] def _calculate_fundamental_sentiment( self, results: list[dict[str, Any]] ) -> dict[str, Any]: """Calculate sentiment specific to fundamental analysis.""" sentiments = [] for result in results: analysis = result.get("analysis", {}) sentiment = analysis.get("sentiment", {}) if sentiment: sentiments.append(sentiment) if not sentiments: return {"direction": "neutral", "confidence": 0.5} # Simple aggregation for now bullish_count = sum(1 for s in sentiments if s.get("direction") == "bullish") bearish_count = sum(1 for s in sentiments if s.get("direction") == "bearish") if bullish_count > bearish_count: return {"direction": "bullish", "confidence": 0.7} elif bearish_count > bullish_count: return {"direction": "bearish", "confidence": 0.7} else: return {"direction": "neutral", "confidence": 0.5} def _calculate_average_credibility(self, results: list[dict[str, Any]]) -> float: """Calculate average credibility of sources.""" if not results: return 0.5 credibility_scores = [r.get("credibility_score", 0.5) for r in results] return sum(credibility_scores) / len(credibility_scores) class TechnicalResearchAgent(BaseSubagent): """Specialized agent for technical analysis research.""" async def execute_research(self, task) -> dict[str, Any]: # task: ResearchTask """Execute technical analysis research.""" self.logger.info(f"Executing technical research for: {task.target_topic}") queries = self._generate_technical_queries(task.target_topic) search_results = await self._perform_specialized_search( topic=task.target_topic, specialized_queries=queries, max_results=6 ) analyzed_results = await self._analyze_search_results( search_results, analysis_focus="technical_analysis" ) # Extract technical-specific insights insights = [] risks = [] opportunities = [] sources = [] for result in analyzed_results: analysis = result.get("analysis", {}) insights.extend(analysis.get("insights", [])) risks.extend(analysis.get("risk_factors", [])) opportunities.extend(analysis.get("opportunities", [])) sources.append( { "title": result.get("title", ""), "url": result.get("url", ""), "credibility_score": result.get("credibility_score", 0.5), "published_date": result.get("published_date"), "author": result.get("author"), } ) return { "research_type": "technical", "insights": list(dict.fromkeys(insights))[:8], "risk_factors": list(dict.fromkeys(risks))[:6], "opportunities": list(dict.fromkeys(opportunities))[:6], "sentiment": self._calculate_technical_sentiment(analyzed_results), "credibility_score": self._calculate_average_credibility(analyzed_results), "sources": sources, "focus_areas": [ "price_action", "chart_patterns", "technical_indicators", "support_resistance", ], } def _generate_technical_queries(self, topic: str) -> list[str]: """Generate technical analysis specific queries.""" return [ f"{topic} technical analysis chart pattern", f"{topic} price target support resistance", f"{topic} RSI MACD technical indicators", f"{topic} breakout trend analysis", f"{topic} volume analysis price movement", ] def _calculate_technical_sentiment( self, results: list[dict[str, Any]] ) -> dict[str, Any]: """Calculate sentiment specific to technical analysis.""" # Similar to fundamental but focused on technical indicators sentiments = [ r.get("analysis", {}).get("sentiment", {}) for r in results if r.get("analysis") ] sentiments = [s for s in sentiments if s] if not sentiments: return {"direction": "neutral", "confidence": 0.5} bullish_count = sum(1 for s in sentiments if s.get("direction") == "bullish") bearish_count = sum(1 for s in sentiments if s.get("direction") == "bearish") if bullish_count > bearish_count: return {"direction": "bullish", "confidence": 0.6} elif bearish_count > bullish_count: return {"direction": "bearish", "confidence": 0.6} else: return {"direction": "neutral", "confidence": 0.5} def _calculate_average_credibility(self, results: list[dict[str, Any]]) -> float: """Calculate average credibility of sources.""" if not results: return 0.5 credibility_scores = [r.get("credibility_score", 0.5) for r in results] return sum(credibility_scores) / len(credibility_scores) class SentimentResearchAgent(BaseSubagent): """Specialized agent for market sentiment analysis.""" async def execute_research(self, task) -> dict[str, Any]: # task: ResearchTask """Execute sentiment analysis research.""" self.logger.info(f"Executing sentiment research for: {task.target_topic}") queries = self._generate_sentiment_queries(task.target_topic) search_results = await self._perform_specialized_search( topic=task.target_topic, specialized_queries=queries, max_results=10 ) analyzed_results = await self._analyze_search_results( search_results, analysis_focus="sentiment_analysis" ) # Extract sentiment-specific insights insights = [] risks = [] opportunities = [] sources = [] for result in analyzed_results: analysis = result.get("analysis", {}) insights.extend(analysis.get("insights", [])) risks.extend(analysis.get("risk_factors", [])) opportunities.extend(analysis.get("opportunities", [])) sources.append( { "title": result.get("title", ""), "url": result.get("url", ""), "credibility_score": result.get("credibility_score", 0.5), "published_date": result.get("published_date"), "author": result.get("author"), } ) return { "research_type": "sentiment", "insights": list(dict.fromkeys(insights))[:8], "risk_factors": list(dict.fromkeys(risks))[:6], "opportunities": list(dict.fromkeys(opportunities))[:6], "sentiment": self._calculate_market_sentiment(analyzed_results), "credibility_score": self._calculate_average_credibility(analyzed_results), "sources": sources, "focus_areas": [ "market_sentiment", "analyst_opinions", "news_sentiment", "social_sentiment", ], } def _generate_sentiment_queries(self, topic: str) -> list[str]: """Generate sentiment analysis specific queries.""" return [ f"{topic} analyst rating recommendation upgrade downgrade", f"{topic} market sentiment investor opinion", f"{topic} news sentiment positive negative", f"{topic} social sentiment reddit twitter discussion", f"{topic} institutional investor sentiment", ] def _calculate_market_sentiment( self, results: list[dict[str, Any]] ) -> dict[str, Any]: """Calculate overall market sentiment.""" sentiments = [ r.get("analysis", {}).get("sentiment", {}) for r in results if r.get("analysis") ] sentiments = [s for s in sentiments if s] if not sentiments: return {"direction": "neutral", "confidence": 0.5} # Weighted by confidence weighted_scores = [] total_confidence = 0 for sentiment in sentiments: direction = sentiment.get("direction", "neutral") confidence = sentiment.get("confidence", 0.5) if direction == "bullish": weighted_scores.append(1 * confidence) elif direction == "bearish": weighted_scores.append(-1 * confidence) else: weighted_scores.append(0) total_confidence += confidence if not weighted_scores: return {"direction": "neutral", "confidence": 0.5} avg_score = sum(weighted_scores) / len(weighted_scores) avg_confidence = total_confidence / len(sentiments) if avg_score > 0.3: return {"direction": "bullish", "confidence": avg_confidence} elif avg_score < -0.3: return {"direction": "bearish", "confidence": avg_confidence} else: return {"direction": "neutral", "confidence": avg_confidence} def _calculate_average_credibility(self, results: list[dict[str, Any]]) -> float: """Calculate average credibility of sources.""" if not results: return 0.5 credibility_scores = [r.get("credibility_score", 0.5) for r in results] return sum(credibility_scores) / len(credibility_scores) class CompetitiveResearchAgent(BaseSubagent): """Specialized agent for competitive and industry analysis.""" async def execute_research(self, task) -> dict[str, Any]: # task: ResearchTask """Execute competitive analysis research.""" self.logger.info(f"Executing competitive research for: {task.target_topic}") queries = self._generate_competitive_queries(task.target_topic) search_results = await self._perform_specialized_search( topic=task.target_topic, specialized_queries=queries, max_results=8 ) analyzed_results = await self._analyze_search_results( search_results, analysis_focus="competitive_analysis" ) # Extract competitive-specific insights insights = [] risks = [] opportunities = [] sources = [] for result in analyzed_results: analysis = result.get("analysis", {}) insights.extend(analysis.get("insights", [])) risks.extend(analysis.get("risk_factors", [])) opportunities.extend(analysis.get("opportunities", [])) sources.append( { "title": result.get("title", ""), "url": result.get("url", ""), "credibility_score": result.get("credibility_score", 0.5), "published_date": result.get("published_date"), "author": result.get("author"), } ) return { "research_type": "competitive", "insights": list(dict.fromkeys(insights))[:8], "risk_factors": list(dict.fromkeys(risks))[:6], "opportunities": list(dict.fromkeys(opportunities))[:6], "sentiment": self._calculate_competitive_sentiment(analyzed_results), "credibility_score": self._calculate_average_credibility(analyzed_results), "sources": sources, "focus_areas": [ "competitive_position", "market_share", "industry_trends", "competitive_advantages", ], } def _generate_competitive_queries(self, topic: str) -> list[str]: """Generate competitive analysis specific queries.""" return [ f"{topic} market share competitive position industry", f"{topic} competitors comparison competitive advantage", f"{topic} industry analysis market trends", f"{topic} competitive landscape market dynamics", f"{topic} industry outlook sector performance", ] def _calculate_competitive_sentiment( self, results: list[dict[str, Any]] ) -> dict[str, Any]: """Calculate sentiment specific to competitive positioning.""" sentiments = [ r.get("analysis", {}).get("sentiment", {}) for r in results if r.get("analysis") ] sentiments = [s for s in sentiments if s] if not sentiments: return {"direction": "neutral", "confidence": 0.5} # Focus on competitive strength indicators bullish_count = sum(1 for s in sentiments if s.get("direction") == "bullish") bearish_count = sum(1 for s in sentiments if s.get("direction") == "bearish") if bullish_count > bearish_count: return {"direction": "bullish", "confidence": 0.6} elif bearish_count > bullish_count: return {"direction": "bearish", "confidence": 0.6} else: return {"direction": "neutral", "confidence": 0.5} def _calculate_average_credibility(self, results: list[dict[str, Any]]) -> float: """Calculate average credibility of sources.""" if not results: return 0.5 credibility_scores = [r.get("credibility_score", 0.5) for r in results] return sum(credibility_scores) / len(credibility_scores)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server