Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
llm_optimization.py61.2 kB
""" LLM-side optimizations for research agents to prevent timeouts. This module provides comprehensive optimization strategies including: - Adaptive model selection based on time constraints - Progressive token budgeting with confidence tracking - Parallel LLM processing with intelligent load balancing - Optimized prompt engineering for speed - Early termination based on confidence thresholds - Content filtering to reduce processing overhead """ import asyncio import logging import re import time from datetime import datetime from enum import Enum from typing import Any from langchain_core.messages import HumanMessage, SystemMessage from pydantic import BaseModel, Field from maverick_mcp.providers.openrouter_provider import ( OpenRouterProvider, TaskType, ) from maverick_mcp.utils.orchestration_logging import ( get_orchestration_logger, log_method_call, ) logger = logging.getLogger(__name__) class ResearchPhase(str, Enum): """Research phases for token allocation.""" SEARCH = "search" CONTENT_ANALYSIS = "content_analysis" SYNTHESIS = "synthesis" VALIDATION = "validation" class ModelConfiguration(BaseModel): """Configuration for model selection with time optimization.""" model_id: str = Field(description="OpenRouter model identifier") max_tokens: int = Field(description="Maximum output tokens") temperature: float = Field(description="Model temperature") timeout_seconds: float = Field(description="Request timeout") parallel_batch_size: int = Field( default=1, description="Sources per batch for this model" ) class TokenAllocation(BaseModel): """Token allocation for a research phase.""" input_tokens: int = Field(description="Maximum input tokens") output_tokens: int = Field(description="Maximum output tokens") per_source_tokens: int = Field(description="Tokens per source") emergency_reserve: int = Field(description="Emergency reserve tokens") timeout_seconds: float = Field(description="Processing timeout") class AdaptiveModelSelector: """Intelligent model selection based on time budgets and task complexity.""" def __init__(self, openrouter_provider: OpenRouterProvider): self.provider = openrouter_provider self.performance_cache = {} # Cache model performance metrics def select_model_for_time_budget( self, task_type: TaskType, time_remaining_seconds: float, complexity_score: float, content_size_tokens: int, confidence_threshold: float = 0.8, current_confidence: float = 0.0, ) -> ModelConfiguration: """Select optimal model based on available time and requirements.""" # Time pressure categories with adaptive thresholds if time_remaining_seconds < 10: return self._select_emergency_model(task_type, content_size_tokens) elif time_remaining_seconds < 25: return self._select_fast_quality_model(task_type, complexity_score) elif time_remaining_seconds < 45: return self._select_balanced_model( task_type, complexity_score, current_confidence ) else: return self._select_optimal_model( task_type, complexity_score, confidence_threshold ) def _select_emergency_model( self, task_type: TaskType, content_size: int ) -> ModelConfiguration: """Ultra-fast models for time-critical situations.""" # OPTIMIZATION: Prioritize speed with increased batch sizes if content_size > 20000: # Large content needs fast + capable models return ModelConfiguration( model_id="google/gemini-2.5-flash", # 199 tokens/sec - fastest available max_tokens=min(800, content_size // 25), # Adaptive token limit temperature=0.05, # OPTIMIZATION: Minimal temp for deterministic fast response timeout_seconds=5, # OPTIMIZATION: Reduced from 8s parallel_batch_size=8, # OPTIMIZATION: Doubled for faster processing ) else: return ModelConfiguration( model_id="openai/gpt-4o-mini", # 126 tokens/sec - excellent speed/cost balance max_tokens=min(500, content_size // 20), temperature=0.03, # OPTIMIZATION: Near-zero for fastest response timeout_seconds=4, # OPTIMIZATION: Reduced from 6s parallel_batch_size=10, # OPTIMIZATION: Doubled for maximum parallelism ) def _select_fast_quality_model( self, task_type: TaskType, complexity_score: float ) -> ModelConfiguration: """Balance speed and quality for time-constrained situations.""" if complexity_score > 0.7 or task_type == TaskType.COMPLEX_REASONING: # Complex tasks - use fast model with good quality return ModelConfiguration( model_id="openai/gpt-4o-mini", # 126 tokens/sec + good quality max_tokens=1200, temperature=0.1, # OPTIMIZATION: Reduced for faster response timeout_seconds=10, # OPTIMIZATION: Reduced from 18s parallel_batch_size=6, # OPTIMIZATION: Doubled for better parallelism ) else: # Simple tasks - use the fastest model available return ModelConfiguration( model_id="google/gemini-2.5-flash", # 199 tokens/sec - fastest max_tokens=1000, temperature=0.1, # OPTIMIZATION: Reduced for faster response timeout_seconds=8, # OPTIMIZATION: Reduced from 12s parallel_batch_size=8, # OPTIMIZATION: Doubled for maximum speed ) def _select_balanced_model( self, task_type: TaskType, complexity_score: float, current_confidence: float ) -> ModelConfiguration: """Standard mode with cost-effectiveness focus.""" # If confidence is already high, use fastest models for validation if current_confidence > 0.7: return ModelConfiguration( model_id="google/gemini-2.5-flash", # 199 tokens/sec - fastest validation max_tokens=1500, temperature=0.25, timeout_seconds=20, # Reduced for fastest model parallel_batch_size=4, # Increased for speed ) # Standard balanced approach - prioritize speed-optimized models if task_type in [TaskType.DEEP_RESEARCH, TaskType.RESULT_SYNTHESIS]: return ModelConfiguration( model_id="openai/gpt-4o-mini", # Speed + quality balance for research max_tokens=2000, temperature=0.3, timeout_seconds=25, # Reduced for faster model parallel_batch_size=3, # Increased for speed ) else: return ModelConfiguration( model_id="google/gemini-2.5-flash", # Fastest for general tasks max_tokens=1500, temperature=0.25, timeout_seconds=20, # Reduced for fastest model parallel_batch_size=4, # Increased for speed ) def _select_optimal_model( self, task_type: TaskType, complexity_score: float, confidence_threshold: float ) -> ModelConfiguration: """Comprehensive mode for complex analysis.""" # Use premium models for the most complex tasks when time allows if complexity_score > 0.8 and task_type == TaskType.DEEP_RESEARCH: return ModelConfiguration( model_id="google/gemini-2.5-pro", max_tokens=3000, temperature=0.3, timeout_seconds=45, parallel_batch_size=1, # Deep thinking models work better individually ) # High-quality cost-effective models for standard comprehensive analysis return ModelConfiguration( model_id="anthropic/claude-sonnet-4", max_tokens=2500, temperature=0.3, timeout_seconds=40, parallel_batch_size=2, ) def calculate_task_complexity( self, content: str, task_type: TaskType, focus_areas: list[str] | None = None ) -> float: """Calculate complexity score based on content and task requirements.""" if not content: return 0.3 # Default low complexity content_lower = content.lower() # Financial complexity indicators complexity_indicators = { "financial_jargon": len( re.findall( r"\b(?:ebitda|dcf|roic?|wacc|beta|volatility|sharpe)\b", content_lower, ) ), "numerical_data": len(re.findall(r"\$?[\d,]+\.?\d*[%kmbKMB]?", content)), "comparative_analysis": len( re.findall( r"\b(?:versus|compared|relative|outperform|underperform)\b", content_lower, ) ), "temporal_analysis": len( re.findall(r"\b(?:quarterly|q[1-4]|fy|yoy|qoq|annual)\b", content_lower) ), "market_terms": len( re.findall( r"\b(?:bullish|bearish|catalyst|headwind|tailwind)\b", content_lower ) ), "technical_terms": len( re.findall( r"\b(?:support|resistance|breakout|rsi|macd|sma|ema)\b", content_lower, ) ), } # Calculate base complexity total_indicators = sum(complexity_indicators.values()) content_length = len(content.split()) base_complexity = min(total_indicators / max(content_length / 100, 1), 1.0) # Task-specific complexity adjustments task_multipliers = { TaskType.DEEP_RESEARCH: 1.4, TaskType.COMPLEX_REASONING: 1.6, TaskType.RESULT_SYNTHESIS: 1.2, TaskType.TECHNICAL_ANALYSIS: 1.3, TaskType.SENTIMENT_ANALYSIS: 0.8, TaskType.QUICK_ANSWER: 0.5, } # Focus area adjustments focus_multiplier = 1.0 if focus_areas: complex_focus_areas = [ "competitive_analysis", "fundamental_analysis", "complex_reasoning", ] if any(area in focus_areas for area in complex_focus_areas): focus_multiplier = 1.2 final_complexity = ( base_complexity * task_multipliers.get(task_type, 1.0) * focus_multiplier ) return min(final_complexity, 1.0) class ProgressiveTokenBudgeter: """Manages token budgets across research phases with time awareness.""" def __init__( self, total_time_budget_seconds: float, confidence_target: float = 0.75 ): self.total_time_budget = total_time_budget_seconds self.confidence_target = confidence_target self.phase_budgets = self._calculate_base_phase_budgets() self.time_started = time.time() def _calculate_base_phase_budgets(self) -> dict[ResearchPhase, int]: """Calculate base token budgets for each research phase.""" # Allocate tokens based on typical phase requirements if self.total_time_budget < 30: # Emergency mode - minimal tokens return { ResearchPhase.SEARCH: 500, ResearchPhase.CONTENT_ANALYSIS: 2000, ResearchPhase.SYNTHESIS: 800, ResearchPhase.VALIDATION: 300, } elif self.total_time_budget < 60: # Fast mode return { ResearchPhase.SEARCH: 1000, ResearchPhase.CONTENT_ANALYSIS: 4000, ResearchPhase.SYNTHESIS: 1500, ResearchPhase.VALIDATION: 500, } else: # Standard mode return { ResearchPhase.SEARCH: 1500, ResearchPhase.CONTENT_ANALYSIS: 6000, ResearchPhase.SYNTHESIS: 2500, ResearchPhase.VALIDATION: 1000, } def allocate_tokens_for_phase( self, phase: ResearchPhase, sources_count: int, current_confidence: float, complexity_score: float = 0.5, ) -> TokenAllocation: """Allocate tokens for a research phase based on current state.""" time_elapsed = time.time() - self.time_started time_remaining = max(0, self.total_time_budget - time_elapsed) base_budget = self.phase_budgets[phase] # Confidence-based scaling if current_confidence > self.confidence_target: # High confidence - focus on validation with fewer tokens confidence_multiplier = 0.7 elif current_confidence < 0.4: # Low confidence - increase token usage if time allows confidence_multiplier = 1.3 if time_remaining > 30 else 0.9 else: confidence_multiplier = 1.0 # Time pressure scaling time_multiplier = self._calculate_time_multiplier(time_remaining) # Complexity scaling complexity_multiplier = 0.8 + (complexity_score * 0.4) # Range: 0.8 to 1.2 # Source count scaling (diminishing returns) if sources_count > 0: source_multiplier = min(1.0 + (sources_count - 3) * 0.05, 1.3) else: source_multiplier = 1.0 # Calculate final budget final_budget = int( base_budget * confidence_multiplier * time_multiplier * complexity_multiplier * source_multiplier ) # Calculate timeout based on available time and token budget base_timeout = min(time_remaining * 0.8, 45) # Max 45 seconds per phase adjusted_timeout = base_timeout * (final_budget / base_budget) ** 0.5 return TokenAllocation( input_tokens=min(int(final_budget * 0.75), 15000), # Cap input tokens output_tokens=min(int(final_budget * 0.25), 3000), # Cap output tokens per_source_tokens=final_budget // max(sources_count, 1) if sources_count > 0 else final_budget, emergency_reserve=200, # Always keep emergency reserve timeout_seconds=max(adjusted_timeout, 5), # Minimum 5 seconds ) def get_next_allocation( self, sources_remaining: int, current_confidence: float, time_elapsed_seconds: float, ) -> dict[str, Any]: """Get the next token allocation for processing sources.""" time_remaining = max(0, self.total_time_budget - time_elapsed_seconds) # Determine priority based on confidence and time pressure if current_confidence < 0.4 and time_remaining > 30: priority = "high" elif current_confidence < 0.6 and time_remaining > 15: priority = "medium" else: priority = "low" # Calculate time budget per remaining source if sources_remaining > 0: time_per_source = time_remaining / sources_remaining else: time_per_source = 0 # Calculate token budget base_tokens = self.phase_budgets.get(ResearchPhase.CONTENT_ANALYSIS, 2000) if priority == "high": max_tokens = min(int(base_tokens * 1.2), 4000) elif priority == "medium": max_tokens = base_tokens else: max_tokens = int(base_tokens * 0.8) return { "time_budget": min(time_per_source, 30.0), # Cap at 30 seconds "max_tokens": max_tokens, "priority": priority, "sources_remaining": sources_remaining, } def _calculate_time_multiplier(self, time_remaining: float) -> float: """Scale token budget based on time pressure.""" if time_remaining < 5: return 0.2 # Extreme emergency mode elif time_remaining < 15: return 0.4 # Emergency mode elif time_remaining < 30: return 0.7 # Time-constrained elif time_remaining < 60: return 0.9 # Slightly reduced else: return 1.0 # Full budget available class ParallelLLMProcessor: """Handles parallel LLM operations with intelligent load balancing.""" def __init__( self, openrouter_provider: OpenRouterProvider, max_concurrent: int = 5, # OPTIMIZATION: Increased from 3 ): self.provider = openrouter_provider self.max_concurrent = max_concurrent self.semaphore = asyncio.BoundedSemaphore( max_concurrent ) # OPTIMIZATION: Use BoundedSemaphore self.model_selector = AdaptiveModelSelector(openrouter_provider) self.orchestration_logger = get_orchestration_logger("ParallelLLMProcessor") # OPTIMIZATION: Track active requests for better coordination self._active_requests = 0 self._request_lock = asyncio.Lock() @log_method_call(component="ParallelLLMProcessor", include_timing=True) async def parallel_content_analysis( self, sources: list[dict], analysis_type: str, persona: str, time_budget_seconds: float, current_confidence: float = 0.0, ) -> list[dict]: """Analyze multiple sources in parallel with adaptive optimization.""" if not sources: return [] self.orchestration_logger.set_request_context( analysis_type=analysis_type, source_count=len(sources), time_budget=time_budget_seconds, ) # Calculate complexity for all sources combined_content = "\n".join( [source.get("content", "")[:1000] for source in sources[:5]] ) overall_complexity = self.model_selector.calculate_task_complexity( combined_content, TaskType.SENTIMENT_ANALYSIS if analysis_type == "sentiment" else TaskType.MARKET_ANALYSIS, ) # Determine optimal batching strategy model_config = self.model_selector.select_model_for_time_budget( task_type=TaskType.SENTIMENT_ANALYSIS if analysis_type == "sentiment" else TaskType.MARKET_ANALYSIS, time_remaining_seconds=time_budget_seconds, complexity_score=overall_complexity, content_size_tokens=len(combined_content) // 4, current_confidence=current_confidence, ) # Create batches based on model configuration batches = self._create_optimal_batches( sources, model_config.parallel_batch_size ) self.orchestration_logger.info( "🔄 PARALLEL_ANALYSIS_START", total_sources=len(sources), batch_count=len(batches), ) # OPTIMIZATION: Process batches using create_task for immediate parallelism running_tasks = [] for i, batch in enumerate(batches): # Create task immediately without awaiting task_future = asyncio.create_task( self._analyze_source_batch( batch=batch, batch_id=i, analysis_type=analysis_type, persona=persona, model_config=model_config, overall_complexity=overall_complexity, ) ) running_tasks.append((i, task_future)) # Track batch ID with future # OPTIMIZATION: Minimal stagger to prevent API overload if i < len(batches) - 1: # Don't delay after last batch await asyncio.sleep(0.01) # 10ms micro-delay # OPTIMIZATION: Use as_completed for progressive result handling batch_results = [None] * len(batches) # Pre-allocate results list timeout_at = time.time() + (time_budget_seconds * 0.9) try: for batch_id, task_future in running_tasks: remaining_time = timeout_at - time.time() if remaining_time <= 0: raise TimeoutError() try: result = await asyncio.wait_for(task_future, timeout=remaining_time) batch_results[batch_id] = result except Exception as e: batch_results[batch_id] = e except TimeoutError: self.orchestration_logger.warning( "⏰ PARALLEL_ANALYSIS_TIMEOUT", timeout=time_budget_seconds ) return self._create_fallback_results(sources) # Flatten and process results final_results = [] successful_batches = 0 for i, batch_result in enumerate(batch_results): if isinstance(batch_result, Exception): self.orchestration_logger.warning( "⚠️ BATCH_FAILED", batch_id=i, error=str(batch_result) ) # Add fallback results for failed batch final_results.extend(self._create_fallback_results(batches[i])) else: final_results.extend(batch_result) successful_batches += 1 self.orchestration_logger.info( "✅ PARALLEL_ANALYSIS_COMPLETE", successful_batches=successful_batches, results_count=len(final_results), ) return final_results def _create_optimal_batches( self, sources: list[dict], batch_size: int ) -> list[list[dict]]: """Create optimal batches for parallel processing.""" if batch_size <= 1: return [[source] for source in sources] batches = [] for i in range(0, len(sources), batch_size): batch = sources[i : i + batch_size] batches.append(batch) return batches async def _analyze_source_batch( self, batch: list[dict], batch_id: int, analysis_type: str, persona: str, model_config: ModelConfiguration, overall_complexity: float, ) -> list[dict]: """Analyze a batch of sources with optimized LLM call.""" # OPTIMIZATION: Track active requests for better coordination async with self._request_lock: self._active_requests += 1 try: # OPTIMIZATION: Acquire semaphore without blocking other task creation await self.semaphore.acquire() try: # Create batch analysis prompt batch_prompt = self._create_batch_analysis_prompt( batch, analysis_type, persona, model_config.max_tokens ) # Get LLM instance llm = self.provider.get_llm( model_override=model_config.model_id, temperature=model_config.temperature, max_tokens=model_config.max_tokens, ) # Execute with timeout start_time = time.time() result = await asyncio.wait_for( llm.ainvoke( [ SystemMessage( content="You are a financial analyst. Provide structured, concise analysis." ), HumanMessage(content=batch_prompt), ] ), timeout=model_config.timeout_seconds, ) execution_time = time.time() - start_time # Parse batch results parsed_results = self._parse_batch_analysis_result( result.content, batch ) self.orchestration_logger.debug( "✨ BATCH_SUCCESS", batch_id=batch_id, duration=f"{execution_time:.2f}s", ) return parsed_results except TimeoutError: self.orchestration_logger.warning( "⏰ BATCH_TIMEOUT", batch_id=batch_id, timeout=model_config.timeout_seconds, ) return self._create_fallback_results(batch) except Exception as e: self.orchestration_logger.error( "💥 BATCH_ERROR", batch_id=batch_id, error=str(e) ) return self._create_fallback_results(batch) finally: # OPTIMIZATION: Always release semaphore self.semaphore.release() finally: # OPTIMIZATION: Track active requests async with self._request_lock: self._active_requests -= 1 def _create_batch_analysis_prompt( self, batch: list[dict], analysis_type: str, persona: str, max_tokens: int ) -> str: """Create optimized prompt for batch analysis.""" # Determine prompt style based on token budget if max_tokens < 800: style = "ultra_concise" elif max_tokens < 1500: style = "concise" else: style = "detailed" prompt_templates = { "ultra_concise": """URGENT BATCH ANALYSIS - {analysis_type} for {persona} investor. Analyze {source_count} sources. For EACH source, provide: SOURCE_N: SENTIMENT:Bull/Bear/Neutral|CONFIDENCE:0-1|INSIGHT:one key point|RISK:main risk {sources} Keep total response under 500 words.""", "concise": """BATCH ANALYSIS - {analysis_type} for {persona} investor perspective. Analyze these {source_count} sources. For each source provide: - Sentiment: Bull/Bear/Neutral + confidence (0-1) - Key insight (1 sentence) - Main risk (1 sentence) - Relevance score (0-1) {sources} Format consistently. Target ~100 words per source.""", "detailed": """Comprehensive {analysis_type} analysis for {persona} investor. Analyze these {source_count} sources with structured output for each: {sources} For each source provide: 1. Sentiment (direction, confidence 0-1, brief reasoning) 2. Key insights (2-3 main points) 3. Risk factors (1-2 key risks) 4. Opportunities (1-2 opportunities if any) 5. Credibility assessment (0-1 score) 6. Relevance score (0-1) Maintain {persona} investor perspective throughout.""", } # Format sources for prompt sources_text = "" for i, source in enumerate(batch, 1): content = source.get("content", "")[:1500] # Limit content length title = source.get("title", f"Source {i}") sources_text += f"\nSOURCE {i} - {title}:\n{content}\n{'---' * 20}\n" template = prompt_templates[style] return template.format( analysis_type=analysis_type, persona=persona, source_count=len(batch), sources=sources_text.strip(), ) def _parse_batch_analysis_result( self, result_content: str, batch: list[dict] ) -> list[dict]: """Parse LLM batch analysis result into structured data.""" results = [] # Try structured parsing first source_sections = re.split(r"\n(?:SOURCE\s+\d+|---+)", result_content) if len(source_sections) >= len(batch): # Structured parsing successful for _i, (source, section) in enumerate( zip(batch, source_sections[1 : len(batch) + 1], strict=False) ): parsed = self._parse_source_analysis(section, source) results.append(parsed) else: # Fallback to simple parsing for i, source in enumerate(batch): fallback_analysis = self._create_simple_fallback_analysis( result_content, source, i ) results.append(fallback_analysis) return results def _parse_source_analysis(self, analysis_text: str, source: dict) -> dict: """Parse analysis text for a single source.""" # Extract sentiment sentiment_match = re.search( r"sentiment:?\s*(\w+)[,\s]*(?:confidence:?\s*([\d.]+))?", analysis_text.lower(), ) if sentiment_match: direction = sentiment_match.group(1).lower() confidence = float(sentiment_match.group(2) or 0.5) # Map common sentiment terms if direction in ["bull", "bullish", "positive"]: direction = "bullish" elif direction in ["bear", "bearish", "negative"]: direction = "bearish" else: direction = "neutral" else: direction = "neutral" confidence = 0.5 # Extract other information insights = self._extract_insights(analysis_text) risks = self._extract_risks(analysis_text) opportunities = self._extract_opportunities(analysis_text) # Extract scores relevance_match = re.search(r"relevance:?\s*([\d.]+)", analysis_text.lower()) relevance_score = float(relevance_match.group(1)) if relevance_match else 0.6 credibility_match = re.search( r"credibility:?\s*([\d.]+)", analysis_text.lower() ) credibility_score = ( float(credibility_match.group(1)) if credibility_match else 0.7 ) return { **source, "analysis": { "insights": insights, "sentiment": {"direction": direction, "confidence": confidence}, "risk_factors": risks, "opportunities": opportunities, "credibility_score": credibility_score, "relevance_score": relevance_score, "analysis_timestamp": datetime.now(), "batch_processed": True, }, } def _extract_insights(self, text: str) -> list[str]: """Extract insights from analysis text.""" insights = [] # Look for insight patterns insight_patterns = [ r"insight:?\s*([^.\n]+)", r"key point:?\s*([^.\n]+)", r"main finding:?\s*([^.\n]+)", ] for pattern in insight_patterns: matches = re.findall(pattern, text, re.IGNORECASE) insights.extend([m.strip() for m in matches if m.strip()]) # If no structured insights found, extract bullet points if not insights: bullet_matches = re.findall(r"[•\-\*]\s*([^.\n]+)", text) insights.extend([m.strip() for m in bullet_matches if m.strip()][:3]) return insights[:5] # Limit to 5 insights def _extract_risks(self, text: str) -> list[str]: """Extract risk factors from analysis text.""" risk_patterns = [ r"risk:?\s*([^.\n]+)", r"concern:?\s*([^.\n]+)", r"headwind:?\s*([^.\n]+)", ] risks = [] for pattern in risk_patterns: matches = re.findall(pattern, text, re.IGNORECASE) risks.extend([m.strip() for m in matches if m.strip()]) return risks[:3] def _extract_opportunities(self, text: str) -> list[str]: """Extract opportunities from analysis text.""" opp_patterns = [ r"opportunit(?:y|ies):?\s*([^.\n]+)", r"catalyst:?\s*([^.\n]+)", r"tailwind:?\s*([^.\n]+)", ] opportunities = [] for pattern in opp_patterns: matches = re.findall(pattern, text, re.IGNORECASE) opportunities.extend([m.strip() for m in matches if m.strip()]) return opportunities[:3] def _create_simple_fallback_analysis( self, full_analysis: str, source: dict, index: int ) -> dict: """Create simple fallback analysis when parsing fails.""" # Basic sentiment analysis from text analysis_lower = full_analysis.lower() positive_words = ["positive", "bullish", "strong", "growth", "opportunity"] negative_words = ["negative", "bearish", "weak", "decline", "risk"] pos_count = sum(1 for word in positive_words if word in analysis_lower) neg_count = sum(1 for word in negative_words if word in analysis_lower) if pos_count > neg_count: sentiment = "bullish" confidence = 0.6 elif neg_count > pos_count: sentiment = "bearish" confidence = 0.6 else: sentiment = "neutral" confidence = 0.5 return { **source, "analysis": { "insights": [f"Analysis based on source content (index {index})"], "sentiment": {"direction": sentiment, "confidence": confidence}, "risk_factors": ["Unable to extract specific risks"], "opportunities": ["Unable to extract specific opportunities"], "credibility_score": 0.5, "relevance_score": 0.5, "analysis_timestamp": datetime.now(), "fallback_used": True, "batch_processed": True, }, } def _create_fallback_results(self, sources: list[dict]) -> list[dict]: """Create fallback results when batch processing fails.""" results = [] for source in sources: fallback_result = { **source, "analysis": { "insights": ["Analysis failed - using fallback"], "sentiment": {"direction": "neutral", "confidence": 0.3}, "risk_factors": ["Analysis timeout - unable to assess risks"], "opportunities": [], "credibility_score": 0.5, "relevance_score": 0.5, "analysis_timestamp": datetime.now(), "fallback_used": True, "batch_timeout": True, }, } results.append(fallback_result) return results class OptimizedPromptEngine: """Creates optimized prompts for different time constraints and confidence levels.""" def __init__(self): self.prompt_cache = {} # Cache for generated prompts self.prompt_templates = { "emergency": { "content_analysis": """URGENT: Quick 3-point analysis of financial content for {persona} investor. Content: {content} Provide ONLY: 1. SENTIMENT: Bull/Bear/Neutral + confidence (0-1) 2. KEY_RISK: Primary risk factor 3. KEY_OPPORTUNITY: Main opportunity (if any) Format: SENTIMENT:Bull|0.8 KEY_RISK:Market volatility KEY_OPPORTUNITY:Earnings growth Max 50 words total. No explanations.""", "synthesis": """URGENT: 2-sentence summary from {source_count} sources for {persona} investor. Key findings: {key_points} Provide: 1) Overall sentiment direction 2) Primary investment implication Max 40 words total.""", }, "fast": { "content_analysis": """Quick financial analysis for {persona} investor - 5 points max. Content: {content} Provide concisely: • Sentiment: Bull/Bear/Neutral (confidence 0-1) • Key insight (1 sentence) • Main risk (1 sentence) • Main opportunity (1 sentence) • Relevance score (0-1) Target: Under 150 words total.""", "synthesis": """Synthesize research findings for {persona} investor. Sources: {source_count} | Key insights: {insights} 4-part summary: 1. Overall sentiment + confidence 2. Top 2 opportunities 3. Top 2 risks 4. Recommended action Limit: 200 words max.""", }, "standard": { "content_analysis": """Financial content analysis for {persona} investor. Content: {content} Focus areas: {focus_areas} Structured analysis: - Sentiment (direction, confidence 0-1, brief reasoning) - Key insights (3-5 bullet points) - Risk factors (2-3 main risks) - Opportunities (2-3 opportunities) - Credibility assessment (0-1) - Relevance score (0-1) Target: 300-500 words.""", "synthesis": """Comprehensive research synthesis for {persona} investor. Research Summary: - Sources analyzed: {source_count} - Key insights: {insights} - Time horizon: {time_horizon} Provide detailed analysis: 1. Executive Summary (2-3 sentences) 2. Key Findings (5-7 bullet points) 3. Investment Implications 4. Risk Assessment 5. Recommended Actions 6. Confidence Level + reasoning Tailor specifically for {persona} investment characteristics.""", }, } def get_optimized_prompt( self, prompt_type: str, time_remaining: float, confidence_level: float, **context, ) -> str: """Generate optimized prompt based on time constraints and confidence.""" # Create cache key cache_key = f"{prompt_type}_{time_remaining:.0f}_{confidence_level:.1f}_{hash(str(sorted(context.items())))}" if cache_key in self.prompt_cache: return self.prompt_cache[cache_key] # Select template based on time pressure if time_remaining < 15: template_category = "emergency" elif time_remaining < 45: template_category = "fast" else: template_category = "standard" template = self.prompt_templates[template_category].get(prompt_type) if not template: # Fallback to fast template template = self.prompt_templates["fast"].get( prompt_type, "Analyze the content quickly and provide key insights." ) # Add confidence-based instructions confidence_instructions = "" if confidence_level > 0.7: confidence_instructions = "\n\nNOTE: High confidence already achieved. Focus on validation and contradictory evidence." elif confidence_level < 0.4: confidence_instructions = "\n\nNOTE: Low confidence. Look for strong supporting evidence to build confidence." # Format template with context formatted_prompt = template.format(**context) + confidence_instructions # Cache the result self.prompt_cache[cache_key] = formatted_prompt return formatted_prompt def create_time_optimized_synthesis_prompt( self, sources: list[dict], persona: str, time_remaining: float, current_confidence: float, ) -> str: """Create synthesis prompt optimized for available time.""" # Extract key information from sources insights = [] sentiments = [] for source in sources: analysis = source.get("analysis", {}) insights.extend(analysis.get("insights", [])[:2]) # Limit per source sentiment = analysis.get("sentiment", {}) if sentiment: sentiments.append(sentiment.get("direction", "neutral")) # Prepare context context = { "persona": persona, "source_count": len(sources), "insights": "; ".join(insights[:8]), # Top 8 insights "key_points": "; ".join(insights[:8]), # For backward compatibility "time_horizon": "short-term" if time_remaining < 30 else "medium-term", } return self.get_optimized_prompt( "synthesis", time_remaining, current_confidence, **context ) class ConfidenceTracker: """Tracks research confidence and triggers early termination when appropriate.""" def __init__( self, target_confidence: float = 0.75, min_sources: int = 3, max_sources: int = 15, ): self.target_confidence = target_confidence self.min_sources = min_sources self.max_sources = max_sources self.confidence_history = [] self.evidence_history = [] self.source_count = 0 self.sources_analyzed = 0 # For backward compatibility self.last_significant_improvement = 0 self.sentiment_votes = {"bullish": 0, "bearish": 0, "neutral": 0} def update_confidence( self, new_evidence: dict, source_credibility: float | None = None, credibility_score: float | None = None, ) -> dict[str, Any]: """Update confidence based on new evidence and return continuation decision.""" # Handle both parameter names for backward compatibility if source_credibility is None and credibility_score is not None: source_credibility = credibility_score elif source_credibility is None and credibility_score is None: source_credibility = 0.5 # Default value self.source_count += 1 self.sources_analyzed += 1 # Keep both for compatibility # Store evidence self.evidence_history.append( { "evidence": new_evidence, "credibility": source_credibility, "timestamp": datetime.now(), } ) # Update sentiment voting sentiment = new_evidence.get("sentiment", {}) direction = sentiment.get("direction", "neutral") confidence = sentiment.get("confidence", 0.5) # Weight vote by source credibility and sentiment confidence vote_weight = source_credibility * confidence self.sentiment_votes[direction] += vote_weight # Calculate evidence strength evidence_strength = self._calculate_evidence_strength( new_evidence, source_credibility ) # Update confidence using Bayesian-style updating current_confidence = self._update_bayesian_confidence(evidence_strength) self.confidence_history.append(current_confidence) # Check for significant improvement if len(self.confidence_history) >= 2: improvement = current_confidence - self.confidence_history[-2] if improvement > 0.1: # 10% improvement self.last_significant_improvement = self.source_count # Make continuation decision should_continue = self._should_continue_research(current_confidence) return { "current_confidence": current_confidence, "should_continue": should_continue, "sources_processed": self.source_count, "sources_analyzed": self.source_count, # For backward compatibility "confidence_trend": self._calculate_confidence_trend(), "early_termination_reason": None if should_continue else self._get_termination_reason(current_confidence), "sentiment_consensus": self._calculate_sentiment_consensus(), } def _calculate_evidence_strength(self, evidence: dict, credibility: float) -> float: """Calculate the strength of new evidence.""" # Base strength from sentiment confidence sentiment = evidence.get("sentiment", {}) sentiment_confidence = sentiment.get("confidence", 0.5) # Adjust for source credibility credibility_adjusted = sentiment_confidence * credibility # Factor in evidence richness insights_count = len(evidence.get("insights", [])) risk_factors_count = len(evidence.get("risk_factors", [])) opportunities_count = len(evidence.get("opportunities", [])) # Evidence richness score (0-1) evidence_richness = min( (insights_count + risk_factors_count + opportunities_count) / 12, 1.0 ) # Relevance factor relevance_score = evidence.get("relevance_score", 0.5) # Final evidence strength calculation final_strength = credibility_adjusted * ( 0.5 + 0.3 * evidence_richness + 0.2 * relevance_score ) return min(final_strength, 1.0) def _update_bayesian_confidence(self, evidence_strength: float) -> float: """Update confidence using Bayesian approach.""" if not self.confidence_history: # First evidence - base confidence return evidence_strength # Current prior prior = self.confidence_history[-1] # Bayesian update with evidence strength as likelihood # Simple approximation: weighted average with decay decay_factor = 0.9 ** (self.source_count - 1) # Diminishing returns updated = prior * decay_factor + evidence_strength * (1 - decay_factor) # Ensure within bounds return max(0.1, min(updated, 0.95)) def _should_continue_research(self, current_confidence: float) -> bool: """Determine if research should continue based on multiple factors.""" # Always process minimum sources if self.source_count < self.min_sources: return True # Stop at maximum sources if self.source_count >= self.max_sources: return False # High confidence reached if current_confidence >= self.target_confidence: return False # Check for diminishing returns if self.source_count - self.last_significant_improvement > 4: # No significant improvement in last 4 sources return False # Check sentiment consensus consensus_score = self._calculate_sentiment_consensus() if consensus_score > 0.8 and self.source_count >= 5: # Strong consensus with adequate sample return False # Check confidence plateau if len(self.confidence_history) >= 3: recent_change = abs(current_confidence - self.confidence_history[-3]) if recent_change < 0.03: # Less than 3% change in last 3 sources return False return True def _calculate_confidence_trend(self) -> str: """Calculate the trend in confidence over recent sources.""" if len(self.confidence_history) < 3: return "insufficient_data" recent = self.confidence_history[-3:] # Calculate trend if recent[-1] > recent[0] + 0.05: return "increasing" elif recent[-1] < recent[0] - 0.05: return "decreasing" else: return "stable" def _calculate_sentiment_consensus(self) -> float: """Calculate how much sources agree on sentiment.""" total_votes = sum(self.sentiment_votes.values()) if total_votes == 0: return 0.0 # Calculate consensus as max vote share max_votes = max(self.sentiment_votes.values()) consensus = max_votes / total_votes return consensus def _get_termination_reason(self, current_confidence: float) -> str: """Get reason for early termination.""" if current_confidence >= self.target_confidence: return "target_confidence_reached" elif self.source_count >= self.max_sources: return "max_sources_reached" elif self._calculate_sentiment_consensus() > 0.8: return "strong_consensus" elif self.source_count - self.last_significant_improvement > 4: return "diminishing_returns" else: return "confidence_plateau" class IntelligentContentFilter: """Pre-filters and prioritizes content to reduce LLM processing overhead.""" def __init__(self): self.relevance_keywords = { "fundamental": { "high": [ "earnings", "revenue", "profit", "ebitda", "cash flow", "debt", "valuation", ], "medium": [ "balance sheet", "income statement", "financial", "quarterly", "annual", ], "context": ["company", "business", "financial results", "guidance"], }, "technical": { "high": [ "price", "chart", "trend", "support", "resistance", "breakout", ], "medium": ["volume", "rsi", "macd", "moving average", "pattern"], "context": ["technical analysis", "trading", "momentum"], }, "sentiment": { "high": ["rating", "upgrade", "downgrade", "buy", "sell", "hold"], "medium": ["analyst", "recommendation", "target price", "outlook"], "context": ["opinion", "sentiment", "market mood"], }, "competitive": { "high": [ "market share", "competitor", "competitive advantage", "industry", ], "medium": ["peer", "comparison", "market position", "sector"], "context": ["competitive landscape", "industry analysis"], }, } self.domain_credibility_scores = { "reuters.com": 0.95, "bloomberg.com": 0.95, "wsj.com": 0.90, "ft.com": 0.90, "marketwatch.com": 0.85, "cnbc.com": 0.80, "yahoo.com": 0.75, "seekingalpha.com": 0.80, "fool.com": 0.70, "investing.com": 0.75, } async def filter_and_prioritize_sources( self, sources: list[dict], research_focus: str, time_budget: float, target_source_count: int | None = None, current_confidence: float = 0.0, ) -> list[dict]: """Filter and prioritize sources based on relevance, quality, and time constraints.""" if not sources: return [] # Determine target count based on time budget and confidence if target_source_count is None: target_source_count = self._calculate_optimal_source_count( time_budget, current_confidence, len(sources) ) # Quick relevance scoring without LLM scored_sources = [] for source in sources: relevance_score = self._calculate_relevance_score(source, research_focus) credibility_score = self._get_source_credibility(source) recency_score = self._calculate_recency_score(source.get("published_date")) # Combined score with weights combined_score = ( relevance_score * 0.5 + credibility_score * 0.3 + recency_score * 0.2 ) if combined_score > 0.3: # Relevance threshold scored_sources.append((combined_score, source)) # Sort by combined score scored_sources.sort(key=lambda x: x[0], reverse=True) # Select diverse sources selected_sources = self._select_diverse_sources( scored_sources, target_source_count, research_focus ) # Pre-process content for faster LLM processing processed_sources = [] for score, source in selected_sources: processed_source = self._preprocess_content( source, research_focus, time_budget ) processed_source["relevance_score"] = score processed_sources.append(processed_source) return processed_sources def _calculate_optimal_source_count( self, time_budget: float, current_confidence: float, available_sources: int ) -> int: """Calculate optimal number of sources to process given constraints.""" # Base count from time budget if time_budget < 20: base_count = 3 elif time_budget < 40: base_count = 6 elif time_budget < 80: base_count = 10 else: base_count = 15 # Adjust for confidence level if current_confidence > 0.7: # High confidence - fewer sources needed confidence_multiplier = 0.7 elif current_confidence < 0.4: # Low confidence - more sources helpful confidence_multiplier = 1.2 else: confidence_multiplier = 1.0 # Final calculation target_count = int(base_count * confidence_multiplier) # Ensure we don't exceed available sources return min(target_count, available_sources, 20) # Cap at 20 def _calculate_relevance_score(self, source: dict, research_focus: str) -> float: """Calculate relevance score using keyword matching and heuristics.""" content = source.get("content", "").lower() title = source.get("title", "").lower() if not content and not title: return 0.0 focus_keywords = self.relevance_keywords.get(research_focus, {}) # High-value keywords high_keywords = focus_keywords.get("high", []) high_score = sum(1 for keyword in high_keywords if keyword in content) / max( len(high_keywords), 1 ) # Medium-value keywords medium_keywords = focus_keywords.get("medium", []) medium_score = sum( 1 for keyword in medium_keywords if keyword in content ) / max(len(medium_keywords), 1) # Context keywords context_keywords = focus_keywords.get("context", []) context_score = sum( 1 for keyword in context_keywords if keyword in content ) / max(len(context_keywords), 1) # Title relevance (titles are more focused) title_high_score = sum( 1 for keyword in high_keywords if keyword in title ) / max(len(high_keywords), 1) # Combine scores with weights relevance_score = ( high_score * 0.4 + medium_score * 0.25 + context_score * 0.15 + title_high_score * 0.2 ) # Boost for very relevant titles if any(keyword in title for keyword in high_keywords): relevance_score *= 1.2 return min(relevance_score, 1.0) def _get_source_credibility(self, source: dict) -> float: """Calculate source credibility based on domain and other factors.""" url = source.get("url", "").lower() # Domain-based credibility domain_score = 0.5 # Default for domain, score in self.domain_credibility_scores.items(): if domain in url: domain_score = score break # Boost for specific high-quality indicators if any(indicator in url for indicator in [".gov", ".edu", "sec.gov"]): domain_score = min(domain_score + 0.2, 1.0) # Penalty for low-quality indicators if any(indicator in url for indicator in ["blog", "forum", "reddit"]): domain_score *= 0.8 return domain_score def _calculate_recency_score(self, published_date: str) -> float: """Calculate recency score based on publication date.""" if not published_date: return 0.5 # Default for unknown dates try: # Parse date (handle various formats) if "T" in published_date: pub_date = datetime.fromisoformat(published_date.replace("Z", "+00:00")) else: pub_date = datetime.strptime(published_date, "%Y-%m-%d") # Calculate days old days_old = (datetime.now() - pub_date.replace(tzinfo=None)).days # Scoring based on age if days_old <= 1: return 1.0 # Very recent elif days_old <= 7: return 0.9 # Recent elif days_old <= 30: return 0.7 # Fairly recent elif days_old <= 90: return 0.5 # Moderately old else: return 0.3 # Old except (ValueError, TypeError): return 0.5 # Default for unparseable dates def _select_diverse_sources( self, scored_sources: list[tuple[float, dict]], target_count: int, research_focus: str, ) -> list[tuple[float, dict]]: """Select diverse sources to avoid redundancy.""" if len(scored_sources) <= target_count: return scored_sources selected = [] used_domains = set() # First pass: select high-scoring diverse sources for score, source in scored_sources: if len(selected) >= target_count: break url = source.get("url", "") domain = self._extract_domain(url) # Ensure diversity by domain (max 2 from same domain initially) domain_count = sum( 1 for _, s in selected if self._extract_domain(s.get("url", "")) == domain ) if domain_count < 2 or len(selected) < target_count // 2: selected.append((score, source)) used_domains.add(domain) # Second pass: fill remaining slots with best remaining sources remaining_needed = target_count - len(selected) if remaining_needed > 0: remaining_sources = scored_sources[len(selected) :] selected.extend(remaining_sources[:remaining_needed]) return selected[:target_count] def _extract_domain(self, url: str) -> str: """Extract domain from URL.""" try: if "//" in url: domain = url.split("//")[1].split("/")[0] return domain.replace("www.", "") return url except Exception: return url def _preprocess_content( self, source: dict, research_focus: str, time_budget: float ) -> dict: """Pre-process content to optimize for LLM analysis.""" content = source.get("content", "") if not content: return source # Determine content length limit based on time budget if time_budget < 30: max_length = 800 # Emergency mode elif time_budget < 60: max_length = 1200 # Fast mode else: max_length = 2000 # Standard mode # If content is already short enough, return as-is if len(content) <= max_length: source_copy = source.copy() source_copy["original_length"] = len(content) source_copy["filtered"] = False return source_copy # Extract most relevant sentences/paragraphs sentences = re.split(r"[.!?]+", content) focus_keywords = self.relevance_keywords.get(research_focus, {}) all_keywords = ( focus_keywords.get("high", []) + focus_keywords.get("medium", []) + focus_keywords.get("context", []) ) # Score sentences by keyword relevance scored_sentences = [] for sentence in sentences: if len(sentence.strip()) < 20: # Skip very short sentences continue sentence_lower = sentence.lower() keyword_count = sum( 1 for keyword in all_keywords if keyword in sentence_lower ) # Boost for financial numbers and percentages has_numbers = bool(re.search(r"\$?[\d,]+\.?\d*[%kmbKMB]?", sentence)) number_boost = 0.5 if has_numbers else 0 sentence_score = keyword_count + number_boost if sentence_score > 0: scored_sentences.append((sentence_score, sentence.strip())) # Sort by relevance and select top sentences scored_sentences.sort(key=lambda x: x[0], reverse=True) # Build filtered content filtered_content = "" for _score, sentence in scored_sentences: if len(filtered_content) + len(sentence) > max_length: break filtered_content += sentence + ". " # If no relevant sentences found, take first part of original content if not filtered_content: filtered_content = content[:max_length] # Create processed source source_copy = source.copy() source_copy["content"] = filtered_content.strip() source_copy["original_length"] = len(content) source_copy["filtered_length"] = len(filtered_content) source_copy["filtered"] = True source_copy["compression_ratio"] = len(filtered_content) / len(content) return source_copy # Export main classes for integration __all__ = [ "AdaptiveModelSelector", "ProgressiveTokenBudgeter", "ParallelLLMProcessor", "OptimizedPromptEngine", "ConfidenceTracker", "IntelligentContentFilter", "ModelConfiguration", "TokenAllocation", "ResearchPhase", ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server