Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
agents.py19.3 kB
""" Agent router for LangGraph-based financial analysis agents. This router exposes the LangGraph agents as MCP tools while maintaining compatibility with the existing infrastructure. """ import logging import os from typing import Any from fastmcp import FastMCP from maverick_mcp.agents.deep_research import DeepResearchAgent from maverick_mcp.agents.market_analysis import MarketAnalysisAgent from maverick_mcp.agents.supervisor import SupervisorAgent logger = logging.getLogger(__name__) # Create the agents router agents_router: FastMCP = FastMCP("Financial_Analysis_Agents") # Cache for agent instances to avoid recreation _agent_cache: dict[str, Any] = {} def get_or_create_agent(agent_type: str, persona: str = "moderate") -> Any: """Get or create an agent instance with caching.""" cache_key = f"{agent_type}:{persona}" if cache_key not in _agent_cache: # Import task-aware LLM factory from maverick_mcp.providers.llm_factory import get_llm from maverick_mcp.providers.openrouter_provider import TaskType # Map agent types to task types for optimal model selection task_mapping = { "market": TaskType.MARKET_ANALYSIS, "technical": TaskType.TECHNICAL_ANALYSIS, "supervisor": TaskType.MULTI_AGENT_ORCHESTRATION, "deep_research": TaskType.DEEP_RESEARCH, } task_type = task_mapping.get(agent_type, TaskType.GENERAL) # Get optimized LLM for this task llm = get_llm(task_type=task_type) # Create agent based on type if agent_type == "market": _agent_cache[cache_key] = MarketAnalysisAgent( llm=llm, persona=persona, ttl_hours=1 ) elif agent_type == "supervisor": # Create mock agents for supervisor agents = { "market": get_or_create_agent("market", persona), "technical": None, # Would be actual technical agent in full implementation } _agent_cache[cache_key] = SupervisorAgent( llm=llm, agents=agents, persona=persona, ttl_hours=1 ) elif agent_type == "deep_research": # Get web search API keys from environment exa_api_key = os.getenv("EXA_API_KEY") agent = DeepResearchAgent( llm=llm, persona=persona, ttl_hours=1, exa_api_key=exa_api_key, ) # Mark for initialization - will be initialized on first use agent._needs_initialization = True _agent_cache[cache_key] = agent else: raise ValueError(f"Unknown agent type: {agent_type}") return _agent_cache[cache_key] async def analyze_market_with_agent( query: str, persona: str = "moderate", screening_strategy: str = "momentum", max_results: int = 20, session_id: str | None = None, ) -> dict[str, Any]: """ Analyze market using LangGraph agent with persona-aware recommendations. This tool uses advanced AI agents that adapt their analysis based on investor risk profiles (conservative, moderate, aggressive). Args: query: Market analysis query (e.g., "Find top momentum stocks") persona: Investor persona (conservative, moderate, aggressive) screening_strategy: Strategy to use (momentum, maverick, supply_demand_breakout) max_results: Maximum number of results session_id: Optional session ID for conversation continuity Returns: Persona-adjusted market analysis with recommendations """ try: # Generate session ID if not provided if not session_id: import uuid session_id = str(uuid.uuid4()) # Get or create agent agent = get_or_create_agent("market", persona) # Run analysis result = await agent.analyze_market( query=query, session_id=session_id, screening_strategy=screening_strategy, max_results=max_results, ) return { "status": "success", "agent_type": "market_analysis", "persona": persona, "session_id": session_id, **result, } except Exception as e: logger.error(f"Error in market agent analysis: {str(e)}") return {"status": "error", "error": str(e), "agent_type": "market_analysis"} async def get_agent_streaming_analysis( query: str, persona: str = "moderate", stream_mode: str = "updates", session_id: str | None = None, ) -> dict[str, Any]: """ Get streaming market analysis with real-time updates. This demonstrates LangGraph's streaming capabilities. In a real implementation, this would return a streaming response. Args: query: Analysis query persona: Investor persona stream_mode: Streaming mode (updates, values, messages) session_id: Optional session ID Returns: Streaming configuration and initial results """ try: if not session_id: import uuid session_id = str(uuid.uuid4()) agent = get_or_create_agent("market", persona) # For MCP compatibility, we'll collect streamed results # In a real implementation, this would be a streaming endpoint updates = [] async for chunk in agent.stream_analysis( query=query, session_id=session_id, stream_mode=stream_mode ): updates.append(chunk) # Limit collected updates for demo if len(updates) >= 5: break return { "status": "success", "stream_mode": stream_mode, "persona": persona, "session_id": session_id, "updates_collected": len(updates), "sample_updates": updates[:3], "note": "Full streaming requires WebSocket or SSE endpoint", } except Exception as e: logger.error(f"Error in streaming analysis: {str(e)}") return {"status": "error", "error": str(e)} async def orchestrated_analysis( query: str, persona: str = "moderate", routing_strategy: str = "llm_powered", max_agents: int = 3, parallel_execution: bool = True, session_id: str | None = None, ) -> dict[str, Any]: """ Run orchestrated multi-agent analysis using the SupervisorAgent. This tool coordinates multiple specialized agents to provide comprehensive financial analysis. The supervisor intelligently routes queries to appropriate agents and synthesizes their results. Args: query: Financial analysis query persona: Investor persona (conservative, moderate, aggressive, day_trader) routing_strategy: How to route tasks (llm_powered, rule_based, hybrid) max_agents: Maximum number of agents to use parallel_execution: Whether to run agents in parallel session_id: Optional session ID for conversation continuity Returns: Orchestrated analysis with synthesized recommendations """ try: if not session_id: import uuid session_id = str(uuid.uuid4()) # Get supervisor agent supervisor = get_or_create_agent("supervisor", persona) # Run orchestrated analysis result = await supervisor.coordinate_agents( query=query, session_id=session_id, routing_strategy=routing_strategy, max_agents=max_agents, parallel_execution=parallel_execution, ) return { "status": "success", "agent_type": "supervisor_orchestrated", "persona": persona, "session_id": session_id, "routing_strategy": routing_strategy, "agents_used": result.get("agents_used", []), "execution_time_ms": result.get("execution_time_ms"), "synthesis_confidence": result.get("synthesis_confidence"), **result, } except Exception as e: logger.error(f"Error in orchestrated analysis: {str(e)}") return { "status": "error", "error": str(e), "agent_type": "supervisor_orchestrated", } async def deep_research_financial( research_topic: str, persona: str = "moderate", research_depth: str = "comprehensive", focus_areas: list[str] | None = None, timeframe: str = "30d", session_id: str | None = None, ) -> dict[str, Any]: """ Conduct comprehensive financial research using web search and AI analysis. This tool performs deep research on financial topics, companies, or market trends using multiple web search providers and AI-powered content analysis. Args: research_topic: Main research topic (company, symbol, or market theme) persona: Investor persona affecting research focus research_depth: Depth level (basic, standard, comprehensive, exhaustive) focus_areas: Specific areas to focus on (e.g., ["fundamentals", "technicals"]) timeframe: Time range for research (7d, 30d, 90d, 1y) session_id: Optional session ID for conversation continuity Returns: Comprehensive research report with validated sources and analysis """ try: if not session_id: import uuid session_id = str(uuid.uuid4()) if focus_areas is None: focus_areas = ["fundamentals", "market_sentiment", "competitive_landscape"] # Get deep research agent researcher = get_or_create_agent("deep_research", persona) # Run deep research result = await researcher.research_comprehensive( topic=research_topic, session_id=session_id, depth=research_depth, focus_areas=focus_areas, timeframe=timeframe, ) return { "status": "success", "agent_type": "deep_research", "persona": persona, "session_id": session_id, "research_topic": research_topic, "research_depth": research_depth, "focus_areas": focus_areas, "sources_analyzed": result.get("total_sources_processed", 0), "research_confidence": result.get("research_confidence"), "validation_checks_passed": result.get("validation_checks_passed"), **result, } except Exception as e: logger.error(f"Error in deep research: {str(e)}") return {"status": "error", "error": str(e), "agent_type": "deep_research"} async def compare_multi_agent_analysis( query: str, agent_types: list[str] | None = None, persona: str = "moderate", session_id: str | None = None, ) -> dict[str, Any]: """ Compare analysis results across multiple agent types. Runs the same query through different specialized agents to show how their approaches and insights differ, providing a multi-dimensional view. Args: query: Analysis query to run across multiple agents agent_types: List of agent types to compare (default: ["market", "supervisor"]) persona: Investor persona for all agents session_id: Optional session ID prefix Returns: Comparative analysis showing different agent perspectives """ try: if not session_id: import uuid session_id = str(uuid.uuid4()) if agent_types is None: agent_types = ["market", "supervisor"] results = {} execution_times = {} for agent_type in agent_types: try: agent = get_or_create_agent(agent_type, persona) # Run analysis based on agent type if agent_type == "market": result = await agent.analyze_market( query=query, session_id=f"{session_id}_{agent_type}", max_results=10, ) elif agent_type == "supervisor": result = await agent.coordinate_agents( query=query, session_id=f"{session_id}_{agent_type}", max_agents=2, ) else: continue results[agent_type] = { "summary": result.get("summary", ""), "key_findings": result.get("key_findings", []), "confidence": result.get("confidence", 0.0), "methodology": result.get("methodology", f"{agent_type} analysis"), } execution_times[agent_type] = result.get("execution_time_ms", 0) except Exception as e: logger.warning(f"Error with {agent_type} agent: {str(e)}") results[agent_type] = {"error": str(e), "status": "failed"} return { "status": "success", "query": query, "persona": persona, "agents_compared": list(results.keys()), "comparison": results, "execution_times_ms": execution_times, "insights": "Each agent brings unique analytical perspectives and methodologies", } except Exception as e: logger.error(f"Error in multi-agent comparison: {str(e)}") return {"status": "error", "error": str(e)} def list_available_agents() -> dict[str, Any]: """ List all available LangGraph agents and their capabilities. Returns: Information about available agents and personas """ return { "status": "success", "agents": { "market_analysis": { "description": "Market screening and sector analysis", "personas": ["conservative", "moderate", "aggressive"], "capabilities": [ "Momentum screening", "Sector rotation analysis", "Market breadth indicators", "Risk-adjusted recommendations", ], "streaming_modes": ["updates", "values", "messages", "debug"], "status": "active", }, "supervisor_orchestrated": { "description": "Multi-agent orchestration and coordination", "personas": ["conservative", "moderate", "aggressive", "day_trader"], "capabilities": [ "Intelligent query routing", "Multi-agent coordination", "Result synthesis and conflict resolution", "Parallel and sequential execution", "Comprehensive analysis workflows", ], "routing_strategies": ["llm_powered", "rule_based", "hybrid"], "status": "active", }, "deep_research": { "description": "Comprehensive financial research with web search", "personas": ["conservative", "moderate", "aggressive", "day_trader"], "capabilities": [ "Multi-provider web search", "AI-powered content analysis", "Source validation and credibility scoring", "Citation and reference management", "Comprehensive research reports", ], "research_depths": ["basic", "standard", "comprehensive", "exhaustive"], "focus_areas": [ "fundamentals", "technicals", "market_sentiment", "competitive_landscape", ], "status": "active", }, "technical_analysis": { "description": "Chart patterns and technical indicators", "status": "coming_soon", }, "risk_management": { "description": "Position sizing and portfolio risk", "status": "coming_soon", }, "portfolio_optimization": { "description": "Rebalancing and allocation", "status": "coming_soon", }, }, "orchestrated_tools": { "orchestrated_analysis": "Coordinate multiple agents for comprehensive analysis", "deep_research_financial": "Conduct thorough research with web search", "compare_multi_agent_analysis": "Compare different agent perspectives", }, "features": { "persona_adaptation": "Agents adjust recommendations based on risk profile", "conversation_memory": "Maintains context within sessions", "streaming_support": "Real-time updates during analysis", "tool_integration": "Access to all MCP financial tools", "multi_agent_orchestration": "Coordinate multiple specialized agents", "web_search_research": "AI-powered research with source validation", "intelligent_routing": "LLM-powered task routing and optimization", }, "personas": ["conservative", "moderate", "aggressive", "day_trader"], "routing_strategies": ["llm_powered", "rule_based", "hybrid"], "research_depths": ["basic", "standard", "comprehensive", "exhaustive"], } async def compare_personas_analysis( query: str, session_id: str | None = None ) -> dict[str, Any]: """ Compare analysis across different investor personas. Runs the same query through conservative, moderate, and aggressive personas to show how recommendations differ. Args: query: Analysis query to run session_id: Optional session ID prefix Returns: Comparative analysis across all personas """ try: if not session_id: import uuid session_id = str(uuid.uuid4()) results = {} for persona in ["conservative", "moderate", "aggressive"]: agent = get_or_create_agent("market", persona) # Run analysis for this persona result = await agent.analyze_market( query=query, session_id=f"{session_id}_{persona}", max_results=10 ) results[persona] = { "summary": result.get("results", {}).get("summary", ""), "top_picks": result.get("results", {}).get("screened_symbols", [])[:5], "risk_parameters": { "risk_tolerance": agent.persona.risk_tolerance, "max_position_size": f"{agent.persona.position_size_max * 100:.1f}%", "stop_loss_multiplier": agent.persona.stop_loss_multiplier, }, } return { "status": "success", "query": query, "comparison": results, "insights": "Notice how recommendations vary by risk profile", } except Exception as e: logger.error(f"Error in persona comparison: {str(e)}") return {"status": "error", "error": str(e)}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server