Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
supervisor.py34.5 kB
""" SupervisorAgent implementation using 2025 LangGraph patterns. Orchestrates multiple specialized agents with intelligent routing, result synthesis, and conflict resolution for comprehensive financial analysis. """ import logging from datetime import datetime from typing import Any from langchain_core.language_models import BaseChatModel from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.tools import BaseTool from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, START, StateGraph from langgraph.types import Command from maverick_mcp.agents.base import INVESTOR_PERSONAS, PersonaAwareAgent from maverick_mcp.config.settings import get_settings from maverick_mcp.exceptions import AgentInitializationError from maverick_mcp.memory.stores import ConversationStore from maverick_mcp.workflows.state import SupervisorState logger = logging.getLogger(__name__) settings = get_settings() # Query routing matrix for intelligent agent selection ROUTING_MATRIX = { "market_screening": { "agents": ["market"], "primary": "market", "parallel": False, "confidence_threshold": 0.7, "synthesis_required": False, }, "technical_analysis": { "agents": ["technical"], "primary": "technical", "parallel": False, "confidence_threshold": 0.8, "synthesis_required": False, }, "stock_investment_decision": { "agents": ["market", "technical"], "primary": "technical", "parallel": True, "confidence_threshold": 0.85, "synthesis_required": True, }, "portfolio_analysis": { "agents": ["market", "technical"], "primary": "market", "parallel": True, "confidence_threshold": 0.75, "synthesis_required": True, }, "deep_research": { "agents": ["research"], # Research agent handles comprehensive analysis "primary": "research", "parallel": False, "confidence_threshold": 0.9, "synthesis_required": False, # Research agent provides complete analysis }, "company_research": { "agents": ["research"], # Dedicated company research "primary": "research", "parallel": False, "confidence_threshold": 0.85, "synthesis_required": False, }, "sentiment_analysis": { "agents": ["research"], # Market sentiment analysis "primary": "research", "parallel": False, "confidence_threshold": 0.8, "synthesis_required": False, }, "risk_assessment": { "agents": ["market", "technical"], # Future risk agent integration "primary": "market", "parallel": True, "confidence_threshold": 0.8, "synthesis_required": True, }, } class QueryClassifier: """LLM-powered query classification with rule-based fallback.""" def __init__(self, llm: BaseChatModel): self.llm = llm async def classify_query(self, query: str, persona: str) -> dict[str, Any]: """Classify query using LLM with structured output.""" classification_prompt = f""" Analyze this financial query and classify it for multi-agent routing. Query: "{query}" Investor Persona: {persona} Classify into one of these categories: 1. market_screening - Finding stocks, sector analysis, market breadth 2. technical_analysis - Chart patterns, indicators, entry/exit points 3. stock_investment_decision - Complete analysis of specific stock(s) 4. portfolio_analysis - Portfolio optimization, risk assessment 5. deep_research - Fundamental analysis, company research, news analysis 6. risk_assessment - Position sizing, risk management, portfolio risk Consider the complexity and return classification with confidence. Return ONLY valid JSON in this exact format: {{ "category": "category_name", "confidence": 0.85, "required_agents": ["agent1", "agent2"], "complexity": "simple", "estimated_execution_time_ms": 30000, "parallel_capable": true, "reasoning": "Brief explanation of classification" }} """ try: response = await self.llm.ainvoke( [ SystemMessage( content="You are a financial query classifier. Return only valid JSON." ), HumanMessage(content=classification_prompt), ] ) # Parse LLM response import json classification = json.loads(response.content.strip()) # Validate and enhance with routing matrix category = classification.get("category", "stock_investment_decision") routing_config = ROUTING_MATRIX.get( category, ROUTING_MATRIX["stock_investment_decision"] ) return { **classification, "routing_config": routing_config, "timestamp": datetime.now(), } except Exception as e: logger.warning(f"LLM classification failed: {e}, using rule-based fallback") return self._rule_based_fallback(query, persona) def _rule_based_fallback(self, query: str, persona: str) -> dict[str, Any]: """Rule-based classification fallback.""" query_lower = query.lower() # Simple keyword-based classification if any( word in query_lower for word in ["screen", "find stocks", "scan", "search"] ): category = "market_screening" elif any( word in query_lower for word in ["chart", "technical", "rsi", "macd", "pattern"] ): category = "technical_analysis" elif any( word in query_lower for word in ["portfolio", "allocation", "diversif"] ): category = "portfolio_analysis" elif any( word in query_lower for word in ["research", "fundamental", "news", "earnings"] ): category = "deep_research" elif any( word in query_lower for word in ["company", "business", "competitive", "industry"] ): category = "company_research" elif any( word in query_lower for word in ["sentiment", "opinion", "mood", "feeling"] ): category = "sentiment_analysis" elif any( word in query_lower for word in ["risk", "position size", "stop loss"] ): category = "risk_assessment" else: category = "stock_investment_decision" routing_config = ROUTING_MATRIX[category] return { "category": category, "confidence": 0.6, "required_agents": routing_config["agents"], "complexity": "moderate", "estimated_execution_time_ms": 60000, "parallel_capable": routing_config["parallel"], "reasoning": "Rule-based classification fallback", "routing_config": routing_config, "timestamp": datetime.now(), } class ResultSynthesizer: """Synthesize results from multiple agents with conflict resolution.""" def __init__(self, llm: BaseChatModel, persona): self.llm = llm self.persona = persona async def synthesize_results( self, agent_results: dict[str, Any], query_type: str, conflicts: list[dict[str, Any]], ) -> dict[str, Any]: """Synthesize final recommendation from agent results.""" # Calculate agent weights based on query type and persona weights = self._calculate_agent_weights(query_type, agent_results) # Create synthesis prompt synthesis_prompt = self._build_synthesis_prompt( agent_results, weights, query_type, conflicts ) # Use LLM to synthesize coherent response synthesis_response = await self.llm.ainvoke( [ SystemMessage(content="You are a financial analysis synthesizer."), HumanMessage(content=synthesis_prompt), ] ) return { "synthesis": synthesis_response.content, "weights_applied": weights, "conflicts_resolved": len(conflicts), "confidence_score": self._calculate_overall_confidence( agent_results, weights ), "contributing_agents": list(agent_results.keys()), "persona_alignment": self._assess_persona_alignment( synthesis_response.content ), } def _calculate_agent_weights( self, query_type: str, agent_results: dict ) -> dict[str, float]: """Calculate weights for agent results based on context.""" base_weights = { "market_screening": {"market": 0.9, "technical": 0.1}, "technical_analysis": {"market": 0.2, "technical": 0.8}, "stock_investment_decision": {"market": 0.4, "technical": 0.6}, "portfolio_analysis": {"market": 0.6, "technical": 0.4}, "deep_research": {"research": 1.0}, "company_research": {"research": 1.0}, "sentiment_analysis": {"research": 1.0}, "risk_assessment": {"market": 0.3, "technical": 0.3, "risk": 0.4}, } weights = base_weights.get(query_type, {"market": 0.5, "technical": 0.5}) # Adjust weights based on agent confidence scores for agent, base_weight in weights.items(): if agent in agent_results: confidence = agent_results[agent].get("confidence_score", 0.5) weights[agent] = base_weight * (0.5 + confidence * 0.5) # Normalize weights to sum to 1.0 total_weight = sum(weights.values()) if total_weight > 0: weights = {k: v / total_weight for k, v in weights.items()} return weights def _build_synthesis_prompt( self, agent_results: dict[str, Any], weights: dict[str, float], query_type: str, conflicts: list[dict[str, Any]], ) -> str: """Build synthesis prompt for LLM.""" prompt = f""" Synthesize a comprehensive financial analysis response from multiple specialized agents. Query Type: {query_type} Investor Persona: {self.persona.name} - {", ".join(self.persona.characteristics)} Agent Results: """ for agent, result in agent_results.items(): weight = weights.get(agent, 0.0) prompt += f"\n{agent.upper()} Agent (Weight: {weight:.2f}):\n" prompt += f" - Confidence: {result.get('confidence_score', 0.5)}\n" prompt += ( f" - Analysis: {result.get('analysis', 'No analysis provided')}\n" ) if "recommendations" in result: prompt += f" - Recommendations: {result['recommendations']}\n" if conflicts: prompt += f"\nConflicts Detected ({len(conflicts)}):\n" for i, conflict in enumerate(conflicts, 1): prompt += f"{i}. {conflict}\n" prompt += f""" Please synthesize these results into a coherent, actionable response that: 1. Weighs agent inputs according to their weights and confidence scores 2. Resolves any conflicts using the {self.persona.name} investor perspective 3. Provides clear, actionable recommendations aligned with {self.persona.name} characteristics 4. Includes appropriate risk disclaimers 5. Maintains professional, confident tone Focus on actionable insights for the {self.persona.name} investor profile. """ return prompt def _calculate_overall_confidence( self, agent_results: dict, weights: dict[str, float] ) -> float: """Calculate weighted overall confidence score.""" total_confidence = 0.0 total_weight = 0.0 for agent, weight in weights.items(): if agent in agent_results: confidence = agent_results[agent].get("confidence_score", 0.5) total_confidence += confidence * weight total_weight += weight return total_confidence / total_weight if total_weight > 0 else 0.5 def _assess_persona_alignment(self, synthesis_content: str) -> float: """Assess how well synthesis aligns with investor persona.""" # Simple keyword-based alignment scoring persona_keywords = { "conservative": ["stable", "dividend", "low-risk", "preservation"], "moderate": ["balanced", "diversified", "moderate", "growth"], "aggressive": ["growth", "momentum", "high-return", "opportunity"], } keywords = persona_keywords.get(self.persona.name.lower(), []) content_lower = synthesis_content.lower() alignment_score = sum(1 for keyword in keywords if keyword in content_lower) return min(alignment_score / len(keywords) if keywords else 0.5, 1.0) class SupervisorAgent(PersonaAwareAgent): """ Multi-agent supervisor using 2025 LangGraph patterns. Orchestrates MarketAnalysisAgent, TechnicalAnalysisAgent, and future DeepResearchAgent with intelligent routing, result synthesis, and conflict resolution. """ def __init__( self, llm: BaseChatModel, agents: dict[str, PersonaAwareAgent], persona: str = "moderate", checkpointer: MemorySaver | None = None, ttl_hours: int = 1, routing_strategy: str = "llm_powered", synthesis_mode: str = "weighted", conflict_resolution: str = "confidence_based", max_iterations: int = 5, ): """Initialize supervisor with existing agent instances.""" if not agents: raise AgentInitializationError( agent_type="SupervisorAgent", reason="No agents provided for supervision", ) # Store agent references self.agents = agents self.market_agent = agents.get("market") self.technical_agent = agents.get("technical") self.research_agent = agents.get("research") # DeepResearchAgent integration # Configuration self.routing_strategy = routing_strategy self.synthesis_mode = synthesis_mode self.conflict_resolution = conflict_resolution self.max_iterations = max_iterations # Ensure all agents use the same persona persona_obj = INVESTOR_PERSONAS.get(persona, INVESTOR_PERSONAS["moderate"]) for agent in agents.values(): if hasattr(agent, "persona"): agent.persona = persona_obj # Get supervisor-specific tools supervisor_tools = self._get_supervisor_tools() # Initialize base class super().__init__( llm=llm, tools=supervisor_tools, persona=persona, checkpointer=checkpointer or MemorySaver(), ttl_hours=ttl_hours, ) # Initialize components self.conversation_store = ConversationStore(ttl_hours=ttl_hours) self.query_classifier = QueryClassifier(llm) self.result_synthesizer = ResultSynthesizer(llm, self.persona) logger.info( f"SupervisorAgent initialized with {len(agents)} agents: {list(agents.keys())}" ) def get_state_schema(self) -> type: """Return SupervisorState schema.""" return SupervisorState def _get_supervisor_tools(self) -> list[BaseTool]: """Get tools specific to supervision and coordination.""" from langchain_core.tools import tool tools = [] if self.market_agent: @tool async def query_market_agent( query: str, session_id: str, screening_strategy: str = "momentum", max_results: int = 20, ) -> dict[str, Any]: """Query the market analysis agent for stock screening and market analysis.""" try: return await self.market_agent.analyze_market( query=query, session_id=session_id, screening_strategy=screening_strategy, max_results=max_results, ) except Exception as e: return {"error": f"Market agent error: {str(e)}"} tools.append(query_market_agent) if self.technical_agent: @tool async def query_technical_agent( symbol: str, timeframe: str = "1d", indicators: list[str] | None = None ) -> dict[str, Any]: """Query the technical analysis agent for chart analysis and indicators.""" try: if indicators is None: indicators = ["sma_20", "rsi", "macd"] return await self.technical_agent.analyze_stock( symbol=symbol, timeframe=timeframe, indicators=indicators ) except Exception as e: return {"error": f"Technical agent error: {str(e)}"} tools.append(query_technical_agent) if self.research_agent: @tool async def query_research_agent( query: str, session_id: str, research_scope: str = "comprehensive", max_sources: int = 50, timeframe: str = "1m", ) -> dict[str, Any]: """Query the deep research agent for comprehensive research and analysis.""" try: return await self.research_agent.research_topic( query=query, session_id=session_id, research_scope=research_scope, max_sources=max_sources, timeframe=timeframe, ) except Exception as e: return {"error": f"Research agent error: {str(e)}"} @tool async def analyze_company_research( symbol: str, session_id: str, include_competitive: bool = True ) -> dict[str, Any]: """Perform comprehensive company research and fundamental analysis.""" try: return await self.research_agent.research_company_comprehensive( symbol=symbol, session_id=session_id, include_competitive_analysis=include_competitive, ) except Exception as e: return {"error": f"Company research error: {str(e)}"} @tool async def analyze_market_sentiment_research( topic: str, session_id: str, timeframe: str = "1w" ) -> dict[str, Any]: """Analyze market sentiment using deep research capabilities.""" try: return await self.research_agent.analyze_market_sentiment( topic=topic, session_id=session_id, timeframe=timeframe ) except Exception as e: return {"error": f"Sentiment analysis error: {str(e)}"} tools.extend( [ query_research_agent, analyze_company_research, analyze_market_sentiment_research, ] ) return tools def _build_graph(self): """Build supervisor graph with multi-agent coordination.""" workflow = StateGraph(SupervisorState) # Core supervisor nodes workflow.add_node("analyze_query", self._analyze_query) workflow.add_node("create_execution_plan", self._create_execution_plan) workflow.add_node("route_to_agents", self._route_to_agents) workflow.add_node("aggregate_results", self._aggregate_results) workflow.add_node("resolve_conflicts", self._resolve_conflicts) workflow.add_node("synthesize_response", self._synthesize_response) # Agent invocation nodes if self.market_agent: workflow.add_node("invoke_market_agent", self._invoke_market_agent) if self.technical_agent: workflow.add_node("invoke_technical_agent", self._invoke_technical_agent) if self.research_agent: workflow.add_node("invoke_research_agent", self._invoke_research_agent) # Coordination nodes workflow.add_node("parallel_coordinator", self._parallel_coordinator) # Tool node if self.tools: from langgraph.prebuilt import ToolNode tool_node = ToolNode(self.tools) workflow.add_node("tools", tool_node) # Define workflow edges workflow.add_edge(START, "analyze_query") workflow.add_edge("analyze_query", "create_execution_plan") workflow.add_edge("create_execution_plan", "route_to_agents") # Conditional routing based on execution plan workflow.add_conditional_edges( "route_to_agents", self._route_decision, { "market_only": "invoke_market_agent" if self.market_agent else "synthesize_response", "technical_only": "invoke_technical_agent" if self.technical_agent else "synthesize_response", "research_only": "invoke_research_agent" if self.research_agent else "synthesize_response", "parallel_execution": "parallel_coordinator", "use_tools": "tools" if self.tools else "synthesize_response", "synthesize": "synthesize_response", }, ) # Agent result collection if self.market_agent: workflow.add_edge("invoke_market_agent", "aggregate_results") if self.technical_agent: workflow.add_edge("invoke_technical_agent", "aggregate_results") if self.research_agent: workflow.add_edge("invoke_research_agent", "aggregate_results") workflow.add_edge("parallel_coordinator", "aggregate_results") if self.tools: workflow.add_edge("tools", "aggregate_results") # Conflict detection and resolution workflow.add_conditional_edges( "aggregate_results", self._check_conflicts, {"resolve": "resolve_conflicts", "synthesize": "synthesize_response"}, ) workflow.add_edge("resolve_conflicts", "synthesize_response") workflow.add_edge("synthesize_response", END) return workflow.compile(checkpointer=self.checkpointer) # Workflow node implementations will continue... # (The rest of the implementation follows the same pattern) async def coordinate_agents( self, query: str, session_id: str, **kwargs ) -> dict[str, Any]: """ Main entry point for multi-agent coordination. Args: query: User query requiring multiple agents session_id: Session identifier **kwargs: Additional parameters Returns: Coordinated response from multiple agents """ start_time = datetime.now() # Initialize supervisor state initial_state = { "messages": [HumanMessage(content=query)], "persona": self.persona.name, "session_id": session_id, "timestamp": datetime.now(), "query_classification": {}, "execution_plan": [], "current_subtask_index": 0, "routing_strategy": self.routing_strategy, "active_agents": [], "agent_results": {}, "agent_confidence": {}, "agent_execution_times": {}, "agent_errors": {}, "workflow_status": "planning", "parallel_execution": False, "dependency_graph": {}, "max_iterations": self.max_iterations, "current_iteration": 0, "conflicts_detected": [], "conflict_resolution": {}, "synthesis_weights": {}, "final_recommendation_confidence": 0.0, "synthesis_mode": self.synthesis_mode, "total_execution_time_ms": 0.0, "agent_coordination_overhead_ms": 0.0, "synthesis_time_ms": 0.0, "cache_utilization": {}, "api_calls_made": 0, "cache_hits": 0, "cache_misses": 0, # Legacy fields initialized as None for backward compatibility "query_type": None, "subtasks": None, "current_subtask": None, "workflow_plan": None, "completed_steps": None, "pending_steps": None, "final_recommendations": None, "confidence_scores": None, "risk_warnings": None, } # Add any additional parameters initial_state.update(kwargs) # Execute supervision workflow try: result = await self.graph.ainvoke( initial_state, config={ "configurable": { "thread_id": session_id, "checkpoint_ns": "supervisor", } }, ) # Calculate total execution time execution_time = (datetime.now() - start_time).total_seconds() * 1000 result["total_execution_time_ms"] = execution_time return self._format_supervisor_response(result) except Exception as e: logger.error(f"Error in supervisor coordination: {e}") return { "status": "error", "error": str(e), "total_execution_time_ms": (datetime.now() - start_time).total_seconds() * 1000, "agent_type": "supervisor", } def _format_supervisor_response(self, result: dict[str, Any]) -> dict[str, Any]: """Format supervisor response for consistent output.""" return { "status": "success", "agent_type": "supervisor", "persona": result.get("persona"), "query_classification": result.get("query_classification", {}), "agents_used": result.get("active_agents", []), "synthesis": result.get("messages", [])[-1].content if result.get("messages") else "No synthesis available", "confidence_score": result.get("final_recommendation_confidence", 0.0), "execution_time_ms": result.get("total_execution_time_ms", 0.0), "conflicts_resolved": len(result.get("conflicts_detected", [])), "workflow_status": result.get("workflow_status", "completed"), } # Placeholder implementations for workflow nodes # These will be implemented based on the specific node logic async def _analyze_query(self, state: SupervisorState) -> Command: """Analyze query to determine routing strategy and requirements.""" query = state["messages"][-1].content if state["messages"] else "" # Classify the query classification = await self.query_classifier.classify_query( query, state["persona"] ) return Command( goto="create_execution_plan", update={ "query_classification": classification, "workflow_status": "analyzing", }, ) async def _create_execution_plan(self, state: SupervisorState) -> Command: """Create execution plan based on query classification.""" classification = state["query_classification"] # Create execution plan based on classification execution_plan = [ { "task_id": "main_analysis", "agents": classification.get("required_agents", ["market"]), "parallel": classification.get("parallel_capable", False), "priority": 1, } ] return Command( goto="route_to_agents", update={"execution_plan": execution_plan, "workflow_status": "planning"}, ) async def _route_to_agents(self, state: SupervisorState) -> Command: """Route query to appropriate agents based on execution plan.""" return Command( goto="parallel_execution", update={"workflow_status": "executing"} ) async def _route_decision(self, state: SupervisorState) -> str: """Decide routing strategy based on state.""" classification = state.get("query_classification", {}) required_agents = classification.get("required_agents", ["market"]) parallel = classification.get("parallel_capable", False) if len(required_agents) == 1: agent = required_agents[0] if agent == "market" and self.market_agent: return "market_only" elif agent == "technical" and self.technical_agent: return "technical_only" elif agent == "research" and self.research_agent: return "research_only" elif len(required_agents) > 1 and parallel: return "parallel_execution" return "synthesize" async def _parallel_coordinator(self, state: SupervisorState) -> Command: """Coordinate parallel execution of multiple agents.""" # This would implement parallel agent coordination # For now, return to aggregation return Command( goto="aggregate_results", update={"workflow_status": "aggregating"} ) async def _invoke_market_agent(self, state: SupervisorState) -> Command: """Invoke market analysis agent.""" if not self.market_agent: return Command( goto="aggregate_results", update={"agent_errors": {"market": "Market agent not available"}}, ) try: query = state["messages"][-1].content if state["messages"] else "" result = await self.market_agent.analyze_market( query=query, session_id=state["session_id"] ) return Command( goto="aggregate_results", update={ "agent_results": {"market": result}, "active_agents": ["market"], }, ) except Exception as e: return Command( goto="aggregate_results", update={ "agent_errors": {"market": str(e)}, "active_agents": ["market"], }, ) async def _invoke_technical_agent(self, state: SupervisorState) -> Command: """Invoke technical analysis agent.""" if not self.technical_agent: return Command( goto="aggregate_results", update={"agent_errors": {"technical": "Technical agent not available"}}, ) # This would implement technical agent invocation return Command( goto="aggregate_results", update={"active_agents": ["technical"]} ) async def _invoke_research_agent(self, state: SupervisorState) -> Command: """Invoke deep research agent (future implementation).""" if not self.research_agent: return Command( goto="aggregate_results", update={"agent_errors": {"research": "Research agent not available"}}, ) # Future implementation return Command(goto="aggregate_results", update={"active_agents": ["research"]}) async def _aggregate_results(self, state: SupervisorState) -> Command: """Aggregate results from all agents.""" return Command( goto="synthesize_response", update={"workflow_status": "synthesizing"} ) def _check_conflicts(self, state: SupervisorState) -> str: """Check if there are conflicts between agent results.""" conflicts = state.get("conflicts_detected", []) return "resolve" if conflicts else "synthesize" async def _resolve_conflicts(self, state: SupervisorState) -> Command: """Resolve conflicts between agent recommendations.""" return Command( goto="synthesize_response", update={"conflict_resolution": {"strategy": "confidence_based"}}, ) async def _synthesize_response(self, state: SupervisorState) -> Command: """Synthesize final response from agent results.""" agent_results = state.get("agent_results", {}) conflicts = state.get("conflicts_detected", []) classification = state.get("query_classification", {}) if agent_results: synthesis = await self.result_synthesizer.synthesize_results( agent_results=agent_results, query_type=classification.get("category", "stock_investment_decision"), conflicts=conflicts, ) return Command( goto="__end__", update={ "final_recommendation_confidence": synthesis["confidence_score"], "synthesis_weights": synthesis["weights_applied"], "workflow_status": "completed", "messages": state["messages"] + [HumanMessage(content=synthesis["synthesis"])], }, ) else: return Command( goto="__end__", update={ "workflow_status": "completed", "messages": state["messages"] + [ HumanMessage(content="No agent results available for synthesis") ], }, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server