Skip to main content
Glama

MCP Hybrid Forecasting

by j1c4b
mcp_trading_server.py19.3 kB
# mcp_trading_server.py - MCP Server for Trading System import asyncio import json import subprocess import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional import pandas as pd from mcp.server import Server from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types # Import your trading system from main import SmartTradingSystem, parse_arguments class TradingMCPServer: """MCP Server for Smart Trading System.""" def __init__(self): self.server = Server("smart-trading-system") self.trading_system = SmartTradingSystem() self.setup_tools() def setup_tools(self): """Setup MCP tools for trading analysis.""" @self.server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available trading tools.""" return [ types.Tool( name="analyze_portfolio", description="Analyze a specific portfolio or all stocks with optional filtering", inputSchema={ "type": "object", "properties": { "portfolio": { "type": "string", "description": "Portfolio name (e.g., 'tech_giants', 'all')", "default": "all" }, "signal_filter": { "type": "string", "enum": ["BUY", "SELL", "HOLD"], "description": "Filter by signal type" }, "min_confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0, "description": "Minimum confidence threshold (0.0-1.0)" } }, "required": ["portfolio"] } ), types.Tool( name="compare_portfolios", description="Compare multiple portfolios with comprehensive analysis", inputSchema={ "type": "object", "properties": { "portfolios": { "type": "array", "items": {"type": "string"}, "description": "List of portfolio names to compare", "minItems": 2 } }, "required": ["portfolios"] } ), types.Tool( name="find_opportunities", description="Find trading opportunities across all stocks with natural language queries", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Natural language query (e.g., 'high confidence buy opportunities', 'defensive stocks')" }, "limit": { "type": "integer", "description": "Maximum number of results", "default": 10 } }, "required": ["query"] } ), types.Tool( name="get_market_summary", description="Get overall market summary and top opportunities", inputSchema={ "type": "object", "properties": { "include_charts": { "type": "boolean", "description": "Generate comparison charts", "default": False } } } ), types.Tool( name="portfolio_health_check", description="Analyze portfolio health and provide recommendations", inputSchema={ "type": "object", "properties": { "portfolio": { "type": "string", "description": "Portfolio name to analyze" }, "risk_tolerance": { "type": "string", "enum": ["conservative", "moderate", "aggressive"], "description": "Investment risk tolerance" } }, "required": ["portfolio"] } ) ] @self.server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent]: """Handle tool calls.""" try: if name == "analyze_portfolio": return await self._analyze_portfolio(arguments or {}) elif name == "compare_portfolios": return await self._compare_portfolios(arguments or {}) elif name == "find_opportunities": return await self._find_opportunities(arguments or {}) elif name == "get_market_summary": return await self._get_market_summary(arguments or {}) elif name == "portfolio_health_check": return await self._portfolio_health_check(arguments or {}) else: return [types.TextContent( type="text", text=f"Unknown tool: {name}" )] except Exception as e: return [types.TextContent( type="text", text=f"Error executing {name}: {str(e)}" )] async def _analyze_portfolio(self, args: dict) -> list[types.TextContent]: """Analyze a portfolio with optional filtering.""" portfolio = args.get("portfolio", "all") signal_filter = args.get("signal_filter") min_confidence = args.get("min_confidence") if portfolio == "all": results = self.trading_system.analyze_all_stocks( signal_filter=signal_filter, min_confidence=min_confidence, save_results=True ) else: results = self.trading_system.analyze_portfolio(portfolio) if signal_filter or min_confidence: results = self.trading_system.filter_results_by_signal( results, signal_filter, min_confidence ) # Format results if not results: return [types.TextContent( type="text", text=f"❌ No results found for {portfolio} with the specified filters." )] # Create formatted response response = f"📊 **{portfolio.upper()} ANALYSIS RESULTS**\n\n" if signal_filter: response += f"🎯 Filtered to {signal_filter} signals\n" if min_confidence: response += f"🎯 Minimum confidence: {min_confidence:.1%}\n" response += f"📈 Found {len(results)} opportunities:\n\n" # Top results for i, result in enumerate(results[:10], 1): response += (f"{i:2d}. {result['emoji']} **{result['ticker']}**: " f"{result['signal']} ({result['expected_change']:+.2%}, " f"{result['confidence']:.1%} confidence)\n") if len(results) > 10: response += f"\n... and {len(results) - 10} more results" # Summary stats if len(results) > 1: avg_change = sum(r['expected_change'] for r in results) / len(results) avg_confidence = sum(r['confidence'] for r in results) / len(results) response += f"\n\n📊 **Summary:**\n" response += f"- Average expected change: {avg_change:+.2%}\n" response += f"- Average confidence: {avg_confidence:.1%}\n" return [types.TextContent(type="text", text=response)] async def _compare_portfolios(self, args: dict) -> list[types.TextContent]: """Compare multiple portfolios.""" portfolios = args.get("portfolios", []) if len(portfolios) < 2: return [types.TextContent( type="text", text="❌ Need at least 2 portfolios to compare" )] results = self.trading_system.compare_portfolios(portfolios) # Format comparison results response = f"🔬 **PORTFOLIO COMPARISON**\n\n" response += f"Comparing: {', '.join(portfolios)}\n\n" if 'comparison_results' in results and 'comparison_table' in results['comparison_results']: df = results['comparison_results']['comparison_table'] response += "📊 **Comparison Table:**\n```\n" response += df.to_string(index=False) response += "\n```\n\n" # Rankings if 'comparison_results' in results and 'rankings' in results['comparison_results']: rankings = results['comparison_results']['rankings'] response += "🏆 **Rankings:**\n\n" for category, top_portfolios in rankings.items(): if top_portfolios: response += f"**{category.replace('_', ' ').title()}:**\n" for i, (portfolio, value) in enumerate(top_portfolios, 1): if 'return' in category: response += f" {i}. {portfolio}: {value:+.2%}\n" else: response += f" {i}. {portfolio}: {value:.2f}\n" response += "\n" # Recommendations if 'comparison_results' in results and 'recommendations' in results['comparison_results']: recommendations = results['comparison_results']['recommendations'] response += "💡 **Recommendations:**\n" for investor_type, recommendation in recommendations.items(): response += f"- **{investor_type.title()}**: {recommendation}\n" return [types.TextContent(type="text", text=response)] async def _find_opportunities(self, args: dict) -> list[types.TextContent]: """Find opportunities based on natural language query.""" query = args.get("query", "").lower() limit = args.get("limit", 10) # Parse natural language query signal_filter = None min_confidence = None if "buy" in query or "bullish" in query: signal_filter = "BUY" elif "sell" in query or "bearish" in query: signal_filter = "SELL" elif "hold" in query or "neutral" in query: signal_filter = "HOLD" if "high confidence" in query: min_confidence = 0.8 elif "medium confidence" in query: min_confidence = 0.6 # Get all stocks and filter results = self.trading_system.analyze_all_stocks( signal_filter=signal_filter, min_confidence=min_confidence, save_results=False ) # Additional filtering based on query if "defensive" in query or "safe" in query: results = [r for r in results if r['volatility'] < 0.03] elif "growth" in query or "aggressive" in query: results = [r for r in results if r['expected_change'] > 0.02] elif "value" in query: results = [r for r in results if r['expected_change'] > 0 and r['volatility'] < 0.025] # Sort and limit results = sorted(results, key=lambda x: x['confidence'], reverse=True)[:limit] response = f"🔍 **OPPORTUNITY SEARCH: '{query}'**\n\n" if not results: response += "❌ No opportunities found matching your criteria." else: response += f"Found {len(results)} opportunities:\n\n" for i, result in enumerate(results, 1): response += (f"{i:2d}. {result['emoji']} **{result['ticker']}**: " f"{result['signal']} ({result['expected_change']:+.2%}, " f"{result['confidence']:.1%} confidence, " f"{result['volatility']:.1%} volatility)\n") return [types.TextContent(type="text", text=response)] async def _get_market_summary(self, args: dict) -> list[types.TextContent]: """Get overall market summary.""" # Analyze all stocks all_results = self.trading_system.analyze_all_stocks(save_results=False) if not all_results: return [types.TextContent( type="text", text="❌ No market data available" )] # Calculate market stats buy_signals = [r for r in all_results if r['signal'] == 'BUY'] sell_signals = [r for r in all_results if r['signal'] == 'SELL'] hold_signals = [r for r in all_results if r['signal'] == 'HOLD'] avg_change = sum(r['expected_change'] for r in all_results) / len(all_results) avg_confidence = sum(r['confidence'] for r in all_results) / len(all_results) high_conf_buys = [r for r in buy_signals if r['confidence'] > 0.8] high_conf_sells = [r for r in sell_signals if r['confidence'] > 0.8] response = f"📈 **MARKET SUMMARY** ({datetime.now().strftime('%Y-%m-%d %H:%M')})\n\n" response += f"📊 **Overall Statistics:**\n" response += f"- Total stocks analyzed: {len(all_results)}\n" response += f"- Average expected change: {avg_change:+.2%}\n" response += f"- Average confidence: {avg_confidence:.1%}\n\n" response += f"🎯 **Signal Distribution:**\n" response += f"- 🟢 BUY signals: {len(buy_signals)} ({len(buy_signals)/len(all_results)*100:.1f}%)\n" response += f"- 🔴 SELL signals: {len(sell_signals)} ({len(sell_signals)/len(all_results)*100:.1f}%)\n" response += f"- 🟡 HOLD signals: {len(hold_signals)} ({len(hold_signals)/len(all_results)*100:.1f}%)\n\n" if high_conf_buys: response += f"🌟 **Top Buy Opportunities** (80%+ confidence):\n" for i, result in enumerate(high_conf_buys[:5], 1): response += (f"{i}. **{result['ticker']}**: {result['expected_change']:+.2%} " f"({result['confidence']:.1%} confidence)\n") response += "\n" if high_conf_sells: response += f"⚠️ **High-Confidence Sell Alerts** (80%+ confidence):\n" for i, result in enumerate(high_conf_sells[:5], 1): response += (f"{i}. **{result['ticker']}**: {result['expected_change']:+.2%} " f"({result['confidence']:.1%} confidence)\n") return [types.TextContent(type="text", text=response)] async def _portfolio_health_check(self, args: dict) -> list[types.TextContent]: """Analyze portfolio health and provide recommendations.""" portfolio = args.get("portfolio") risk_tolerance = args.get("risk_tolerance", "moderate") results = self.trading_system.analyze_portfolio(portfolio) if not results: return [types.TextContent( type="text", text=f"❌ No data available for portfolio: {portfolio}" )] # Calculate health metrics metrics = self.trading_system.comparator.calculate_portfolio_metrics(results, portfolio) response = f"🏥 **PORTFOLIO HEALTH CHECK: {portfolio.upper()}**\n\n" # Health score quality_score = metrics['quality_score'] if quality_score >= 80: health_emoji = "🟢" health_status = "Excellent" elif quality_score >= 60: health_emoji = "🟡" health_status = "Good" else: health_emoji = "🔴" health_status = "Needs Attention" response += f"{health_emoji} **Overall Health: {health_status}** (Quality Score: {quality_score:.1f}/100)\n\n" # Key metrics response += f"📊 **Key Metrics:**\n" response += f"- Expected Return: {metrics['expected_return']:+.2%}\n" response += f"- Risk Level: {metrics['portfolio_volatility']:.2%}\n" response += f"- Confidence: {metrics['avg_confidence']:.1%}\n" response += f"- Action Signals: {metrics['buy_signals']} BUY, {metrics['sell_signals']} SELL\n\n" # Risk-adjusted recommendations response += f"💡 **Recommendations** (Risk Tolerance: {risk_tolerance}):\n" if risk_tolerance == "conservative": if metrics['portfolio_volatility'] > 0.03: response += "⚠️ Consider reducing high-volatility positions\n" if metrics['buy_signals'] > metrics['num_stocks'] * 0.3: response += "✅ Good buying opportunities, but diversify carefully\n" elif risk_tolerance == "aggressive": if metrics['expected_return'] < 0.02: response += "📈 Consider more growth-oriented positions\n" if metrics['buy_signals'] > 0: response += "🚀 Good time to capitalize on buy signals\n" return [types.TextContent(type="text", text=response)] async def main(): """Run the MCP server.""" server_instance = TradingMCPServer() async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server_instance.server.run( read_stream, write_stream, InitializationOptions( server_name="smart-trading-system", server_version="1.0.0", capabilities=server_instance.server.get_capabilities( notification_options=None, experimental_capabilities=None, ), ), ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/j1c4b/mcp-hybrid-forecasting'

If you have feedback or need assistance with the MCP directory API, please join our Discord server