Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
parallel_research.py20 kB
""" Parallel Research Execution Utilities This module provides infrastructure for spawning and managing parallel research subagents for comprehensive financial analysis. """ import asyncio import logging import time from collections.abc import Callable from typing import Any from ..agents.circuit_breaker import circuit_breaker from ..config.settings import get_settings from .orchestration_logging import ( get_orchestration_logger, log_agent_execution, log_method_call, log_parallel_execution, log_performance_metrics, log_resource_usage, ) logger = logging.getLogger(__name__) settings = get_settings() class ParallelResearchConfig: """Configuration for parallel research operations.""" def __init__( self, max_concurrent_agents: int = 6, # OPTIMIZATION: Increased from 4 for better parallelism timeout_per_agent: int = 60, # OPTIMIZATION: Reduced from 180s to prevent blocking enable_fallbacks: bool = False, # Disabled by default for speed rate_limit_delay: float = 0.05, # OPTIMIZATION: Minimal delay (50ms) for API rate limits only batch_size: int = 3, # OPTIMIZATION: Batch size for task grouping use_worker_pool: bool = True, # OPTIMIZATION: Enable worker pool pattern ): self.max_concurrent_agents = max_concurrent_agents self.timeout_per_agent = timeout_per_agent self.enable_fallbacks = enable_fallbacks self.rate_limit_delay = rate_limit_delay self.batch_size = batch_size self.use_worker_pool = use_worker_pool class ResearchTask: """Represents a single research task for parallel execution.""" def __init__( self, task_id: str, task_type: str, target_topic: str, focus_areas: list[str], priority: int = 1, timeout: int | None = None, ): self.task_id = task_id self.task_type = task_type # fundamental, technical, sentiment, competitive self.target_topic = target_topic self.focus_areas = focus_areas self.priority = priority self.timeout = timeout self.start_time: float | None = None self.end_time: float | None = None self.status: str = "pending" # pending, running, completed, failed self.result: dict[str, Any] | None = None self.error: str | None = None class ResearchResult: """Aggregated results from parallel research execution.""" def __init__(self): self.task_results: dict[str, ResearchTask] = {} self.synthesis: dict[str, Any] | None = None self.total_execution_time: float = 0.0 self.successful_tasks: int = 0 self.failed_tasks: int = 0 self.parallel_efficiency: float = 0.0 class ParallelResearchOrchestrator: """Orchestrates parallel research agent execution.""" def __init__(self, config: ParallelResearchConfig | None = None): self.config = config or ParallelResearchConfig() self.active_tasks: dict[str, ResearchTask] = {} # OPTIMIZATION: Use bounded semaphore for better control self._semaphore = asyncio.BoundedSemaphore(self.config.max_concurrent_agents) self.orchestration_logger = get_orchestration_logger("ParallelOrchestrator") # Track active workers for better coordination self._active_workers = 0 self._worker_lock = asyncio.Lock() # Log initialization self.orchestration_logger.info( "🎛️ ORCHESTRATOR_INIT", max_agents=self.config.max_concurrent_agents, ) @log_method_call(component="ParallelOrchestrator", include_timing=True) async def execute_parallel_research( self, tasks: list[ResearchTask], research_executor, synthesis_callback: Callable[..., Any] | None = None, ) -> ResearchResult: """ Execute multiple research tasks in parallel with intelligent coordination. Args: tasks: List of research tasks to execute research_executor: Function to execute individual research tasks synthesis_callback: Optional function to synthesize results Returns: ResearchResult with aggregated results and synthesis """ self.orchestration_logger.set_request_context( session_id=tasks[0].task_id.split("_")[0] if tasks else "unknown", task_count=len(tasks), ) # Log task overview self.orchestration_logger.info( "📋 TASK_OVERVIEW", task_count=len(tasks), max_concurrent=self.config.max_concurrent_agents, ) start_time = time.time() # Create result container result = ResearchResult() with log_parallel_execution( "ParallelOrchestrator", "research execution", len(tasks) ) as exec_logger: try: # Prepare tasks for execution prepared_tasks = await self._prepare_tasks(tasks) exec_logger.info( "🔧 TASKS_PREPARED", prepared_count=len(prepared_tasks) ) # OPTIMIZATION: Use create_task for true parallel execution # This allows tasks to start immediately without waiting exec_logger.info("🚀 PARALLEL_EXECUTION_START") # Create all tasks immediately for maximum parallelism running_tasks = [] for task in prepared_tasks: # Create task immediately without awaiting task_future = asyncio.create_task( self._execute_single_task(task, research_executor) ) running_tasks.append(task_future) # OPTIMIZATION: Minimal delay only if absolutely needed for API rate limits # Reduced from progressive delays to fixed minimal delay if self.config.rate_limit_delay > 0 and len(running_tasks) < len( prepared_tasks ): await asyncio.sleep( self.config.rate_limit_delay * 0.1 ) # 10% of original delay # Wait for all tasks to complete using asyncio.as_completed for better responsiveness completed_tasks = [] for task_future in asyncio.as_completed(running_tasks): try: result_task = await task_future completed_tasks.append(result_task) except Exception as e: # Handle exceptions without blocking other tasks completed_tasks.append(e) exec_logger.info("🏁 PARALLEL_EXECUTION_COMPLETE") # Process results result = await self._process_task_results( prepared_tasks, completed_tasks, start_time ) # Log performance metrics log_performance_metrics( "ParallelOrchestrator", { "total_tasks": len(tasks), "successful_tasks": result.successful_tasks, "failed_tasks": result.failed_tasks, "parallel_efficiency": result.parallel_efficiency, "total_duration": result.total_execution_time, }, ) # Synthesize results if callback provided if synthesis_callback and result.successful_tasks > 0: exec_logger.info("🧠 SYNTHESIS_START") try: synthesis_start = time.time() result.synthesis = await synthesis_callback(result.task_results) _ = ( time.time() - synthesis_start ) # Track duration but not used currently exec_logger.info("✅ SYNTHESIS_SUCCESS") except Exception as e: exec_logger.error("❌ SYNTHESIS_FAILED", error=str(e)) result.synthesis = {"error": f"Synthesis failed: {str(e)}"} else: exec_logger.info("⏭️ SYNTHESIS_SKIPPED") return result except Exception as e: exec_logger.error("💥 PARALLEL_EXECUTION_FAILED", error=str(e)) result.total_execution_time = time.time() - start_time return result async def _prepare_tasks(self, tasks: list[ResearchTask]) -> list[ResearchTask]: """Prepare tasks for execution by setting timeouts and priorities.""" prepared = [] for task in sorted(tasks, key=lambda t: t.priority, reverse=True): # Set default timeout if not specified if not task.timeout: task.timeout = self.config.timeout_per_agent # Set task to pending status task.status = "pending" self.active_tasks[task.task_id] = task prepared.append(task) return prepared[: self.config.max_concurrent_agents] @circuit_breaker("parallel_research_task", failure_threshold=2, recovery_timeout=30) async def _execute_single_task( self, task: ResearchTask, research_executor ) -> ResearchTask: """Execute a single research task with optimized error handling.""" # OPTIMIZATION: Acquire semaphore with try_acquire pattern for non-blocking acquired = False try: # Try to acquire immediately, if not available, task is already created and will wait acquired = not self._semaphore.locked() if not acquired: # Wait for semaphore but don't block other task creation await self._semaphore.acquire() acquired = True task.start_time = time.time() task.status = "running" # Track active worker count async with self._worker_lock: self._active_workers += 1 with log_agent_execution( task.task_type, task.task_id, task.focus_areas ) as agent_logger: try: agent_logger.info( "🎯 TASK_EXECUTION_START", timeout=task.timeout, priority=task.priority, ) # OPTIMIZATION: Use shield to prevent cancellation during critical work result = await asyncio.shield( asyncio.wait_for(research_executor(task), timeout=task.timeout) ) task.result = result task.status = "completed" task.end_time = time.time() # Log successful completion execution_time = task.end_time - task.start_time agent_logger.info( "✨ TASK_EXECUTION_SUCCESS", duration=f"{execution_time:.3f}s", ) # Log resource usage if available if isinstance(result, dict) and "metrics" in result: log_resource_usage( f"{task.task_type}Agent", api_calls=result["metrics"].get("api_calls"), cache_hits=result["metrics"].get("cache_hits"), ) return task except TimeoutError: task.error = f"Task timeout after {task.timeout}s" task.status = "failed" agent_logger.error("⏰ TASK_TIMEOUT", timeout=task.timeout) except Exception as e: task.error = str(e) task.status = "failed" agent_logger.error("💥 TASK_EXECUTION_FAILED", error=str(e)) finally: task.end_time = time.time() # Track active worker count async with self._worker_lock: self._active_workers -= 1 return task finally: # Always release semaphore if acquired if acquired: self._semaphore.release() async def _process_task_results( self, tasks: list[ResearchTask], completed_tasks: list[Any], start_time: float ) -> ResearchResult: """Process and aggregate results from completed tasks.""" result = ResearchResult() result.total_execution_time = time.time() - start_time for task in tasks: result.task_results[task.task_id] = task if task.status == "completed": result.successful_tasks += 1 else: result.failed_tasks += 1 # Calculate parallel efficiency if result.total_execution_time > 0: total_sequential_time = sum( (task.end_time or 0) - (task.start_time or 0) for task in tasks if task.start_time ) result.parallel_efficiency = ( (total_sequential_time / result.total_execution_time) if total_sequential_time > 0 else 0.0 ) logger.info( f"Parallel research completed: {result.successful_tasks} successful, " f"{result.failed_tasks} failed, {result.parallel_efficiency:.2f}x speedup" ) return result class TaskDistributionEngine: """Intelligent task distribution for research topics.""" TASK_TYPES = { "fundamental": { "keywords": [ "earnings", "revenue", "profit", "cash flow", "debt", "valuation", ], "focus_areas": ["financials", "fundamentals", "earnings", "balance_sheet"], }, "technical": { "keywords": [ "price", "chart", "trend", "support", "resistance", "momentum", ], "focus_areas": ["technical_analysis", "chart_patterns", "indicators"], }, "sentiment": { "keywords": [ "sentiment", "news", "analyst", "opinion", "rating", "recommendation", ], "focus_areas": ["market_sentiment", "analyst_ratings", "news_sentiment"], }, "competitive": { "keywords": [ "competitor", "market share", "industry", "competitive", "peers", ], "focus_areas": [ "competitive_analysis", "industry_analysis", "market_position", ], }, } @log_method_call(component="TaskDistributionEngine", include_timing=True) def distribute_research_tasks( self, topic: str, session_id: str, focus_areas: list[str] | None = None ) -> list[ResearchTask]: """ Intelligently distribute a research topic into specialized tasks. Args: topic: Main research topic session_id: Session identifier for tracking focus_areas: Optional specific areas to focus on Returns: List of specialized research tasks """ distribution_logger = get_orchestration_logger("TaskDistributionEngine") distribution_logger.set_request_context(session_id=session_id) distribution_logger.info( "🎯 TASK_DISTRIBUTION_START", session_id=session_id, ) tasks = [] topic_lower = topic.lower() # Determine which task types are relevant relevant_types = self._analyze_topic_relevance(topic_lower, focus_areas) # Log relevance analysis results distribution_logger.info("🧠 RELEVANCE_ANALYSIS") # Create tasks for relevant types created_tasks = [] for task_type, score in relevant_types.items(): if score > 0.3: # Relevance threshold task = ResearchTask( task_id=f"{session_id}_{task_type}", task_type=task_type, target_topic=topic, focus_areas=self.TASK_TYPES[task_type]["focus_areas"], priority=int(score * 10), # Convert to 1-10 priority ) tasks.append(task) created_tasks.append( { "type": task_type, "priority": task.priority, "score": score, "focus_areas": task.focus_areas[:3], # Limit for logging } ) # Log created tasks if created_tasks: distribution_logger.info( "✅ TASKS_CREATED", task_count=len(created_tasks), ) # Ensure at least one task (fallback to fundamental analysis) if not tasks: distribution_logger.warning( "⚠️ NO_RELEVANT_TASKS_FOUND - using fallback", threshold=0.3, max_score=max(relevant_types.values()) if relevant_types else 0, ) fallback_task = ResearchTask( task_id=f"{session_id}_fundamental", task_type="fundamental", target_topic=topic, focus_areas=["general_analysis"], priority=5, ) tasks.append(fallback_task) distribution_logger.info( "🔄 FALLBACK_TASK_CREATED", task_type="fundamental" ) # Final summary task_summary = { "total_tasks": len(tasks), "task_types": [t.task_type for t in tasks], "avg_priority": sum(t.priority for t in tasks) / len(tasks) if tasks else 0, } distribution_logger.info("🎉 TASK_DISTRIBUTION_COMPLETE", **task_summary) return tasks def _analyze_topic_relevance( self, topic: str, focus_areas: list[str] | None = None ) -> dict[str, float]: """Analyze topic relevance to different research types.""" relevance_scores = {} for task_type, config in self.TASK_TYPES.items(): score = 0.0 # Score based on keywords in topic keyword_matches = sum( 1 for keyword in config["keywords"] if keyword in topic ) score += keyword_matches / len(config["keywords"]) * 0.6 # Score based on focus areas if focus_areas: focus_matches = sum( 1 for focus in focus_areas if any(area in focus.lower() for area in config["focus_areas"]) ) score += focus_matches / len(config["focus_areas"]) * 0.4 else: # Default relevance for common research types score += { "fundamental": 0.8, "sentiment": 0.6, "technical": 0.4, "competitive": 0.5, }.get(task_type, 0.3) relevance_scores[task_type] = min(score, 1.0) return relevance_scores # Export key classes for easy import __all__ = [ "ParallelResearchConfig", "ResearchTask", "ResearchResult", "ParallelResearchOrchestrator", "TaskDistributionEngine", ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server