mcp_trading_server.py•19.3 kB
# mcp_trading_server.py - MCP Server for Trading System
import asyncio
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import pandas as pd
from mcp.server import Server
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types
# Import your trading system
from main import SmartTradingSystem, parse_arguments
class TradingMCPServer:
"""MCP Server for Smart Trading System."""
def __init__(self):
self.server = Server("smart-trading-system")
self.trading_system = SmartTradingSystem()
self.setup_tools()
def setup_tools(self):
"""Setup MCP tools for trading analysis."""
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available trading tools."""
return [
types.Tool(
name="analyze_portfolio",
description="Analyze a specific portfolio or all stocks with optional filtering",
inputSchema={
"type": "object",
"properties": {
"portfolio": {
"type": "string",
"description": "Portfolio name (e.g., 'tech_giants', 'all')",
"default": "all"
},
"signal_filter": {
"type": "string",
"enum": ["BUY", "SELL", "HOLD"],
"description": "Filter by signal type"
},
"min_confidence": {
"type": "number",
"minimum": 0.0,
"maximum": 1.0,
"description": "Minimum confidence threshold (0.0-1.0)"
}
},
"required": ["portfolio"]
}
),
types.Tool(
name="compare_portfolios",
description="Compare multiple portfolios with comprehensive analysis",
inputSchema={
"type": "object",
"properties": {
"portfolios": {
"type": "array",
"items": {"type": "string"},
"description": "List of portfolio names to compare",
"minItems": 2
}
},
"required": ["portfolios"]
}
),
types.Tool(
name="find_opportunities",
description="Find trading opportunities across all stocks with natural language queries",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language query (e.g., 'high confidence buy opportunities', 'defensive stocks')"
},
"limit": {
"type": "integer",
"description": "Maximum number of results",
"default": 10
}
},
"required": ["query"]
}
),
types.Tool(
name="get_market_summary",
description="Get overall market summary and top opportunities",
inputSchema={
"type": "object",
"properties": {
"include_charts": {
"type": "boolean",
"description": "Generate comparison charts",
"default": False
}
}
}
),
types.Tool(
name="portfolio_health_check",
description="Analyze portfolio health and provide recommendations",
inputSchema={
"type": "object",
"properties": {
"portfolio": {
"type": "string",
"description": "Portfolio name to analyze"
},
"risk_tolerance": {
"type": "string",
"enum": ["conservative", "moderate", "aggressive"],
"description": "Investment risk tolerance"
}
},
"required": ["portfolio"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent]:
"""Handle tool calls."""
try:
if name == "analyze_portfolio":
return await self._analyze_portfolio(arguments or {})
elif name == "compare_portfolios":
return await self._compare_portfolios(arguments or {})
elif name == "find_opportunities":
return await self._find_opportunities(arguments or {})
elif name == "get_market_summary":
return await self._get_market_summary(arguments or {})
elif name == "portfolio_health_check":
return await self._portfolio_health_check(arguments or {})
else:
return [types.TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except Exception as e:
return [types.TextContent(
type="text",
text=f"Error executing {name}: {str(e)}"
)]
async def _analyze_portfolio(self, args: dict) -> list[types.TextContent]:
"""Analyze a portfolio with optional filtering."""
portfolio = args.get("portfolio", "all")
signal_filter = args.get("signal_filter")
min_confidence = args.get("min_confidence")
if portfolio == "all":
results = self.trading_system.analyze_all_stocks(
signal_filter=signal_filter,
min_confidence=min_confidence,
save_results=True
)
else:
results = self.trading_system.analyze_portfolio(portfolio)
if signal_filter or min_confidence:
results = self.trading_system.filter_results_by_signal(
results, signal_filter, min_confidence
)
# Format results
if not results:
return [types.TextContent(
type="text",
text=f"❌ No results found for {portfolio} with the specified filters."
)]
# Create formatted response
response = f"📊 **{portfolio.upper()} ANALYSIS RESULTS**\n\n"
if signal_filter:
response += f"🎯 Filtered to {signal_filter} signals\n"
if min_confidence:
response += f"🎯 Minimum confidence: {min_confidence:.1%}\n"
response += f"📈 Found {len(results)} opportunities:\n\n"
# Top results
for i, result in enumerate(results[:10], 1):
response += (f"{i:2d}. {result['emoji']} **{result['ticker']}**: "
f"{result['signal']} ({result['expected_change']:+.2%}, "
f"{result['confidence']:.1%} confidence)\n")
if len(results) > 10:
response += f"\n... and {len(results) - 10} more results"
# Summary stats
if len(results) > 1:
avg_change = sum(r['expected_change'] for r in results) / len(results)
avg_confidence = sum(r['confidence'] for r in results) / len(results)
response += f"\n\n📊 **Summary:**\n"
response += f"- Average expected change: {avg_change:+.2%}\n"
response += f"- Average confidence: {avg_confidence:.1%}\n"
return [types.TextContent(type="text", text=response)]
async def _compare_portfolios(self, args: dict) -> list[types.TextContent]:
"""Compare multiple portfolios."""
portfolios = args.get("portfolios", [])
if len(portfolios) < 2:
return [types.TextContent(
type="text",
text="❌ Need at least 2 portfolios to compare"
)]
results = self.trading_system.compare_portfolios(portfolios)
# Format comparison results
response = f"🔬 **PORTFOLIO COMPARISON**\n\n"
response += f"Comparing: {', '.join(portfolios)}\n\n"
if 'comparison_results' in results and 'comparison_table' in results['comparison_results']:
df = results['comparison_results']['comparison_table']
response += "📊 **Comparison Table:**\n```\n"
response += df.to_string(index=False)
response += "\n```\n\n"
# Rankings
if 'comparison_results' in results and 'rankings' in results['comparison_results']:
rankings = results['comparison_results']['rankings']
response += "🏆 **Rankings:**\n\n"
for category, top_portfolios in rankings.items():
if top_portfolios:
response += f"**{category.replace('_', ' ').title()}:**\n"
for i, (portfolio, value) in enumerate(top_portfolios, 1):
if 'return' in category:
response += f" {i}. {portfolio}: {value:+.2%}\n"
else:
response += f" {i}. {portfolio}: {value:.2f}\n"
response += "\n"
# Recommendations
if 'comparison_results' in results and 'recommendations' in results['comparison_results']:
recommendations = results['comparison_results']['recommendations']
response += "💡 **Recommendations:**\n"
for investor_type, recommendation in recommendations.items():
response += f"- **{investor_type.title()}**: {recommendation}\n"
return [types.TextContent(type="text", text=response)]
async def _find_opportunities(self, args: dict) -> list[types.TextContent]:
"""Find opportunities based on natural language query."""
query = args.get("query", "").lower()
limit = args.get("limit", 10)
# Parse natural language query
signal_filter = None
min_confidence = None
if "buy" in query or "bullish" in query:
signal_filter = "BUY"
elif "sell" in query or "bearish" in query:
signal_filter = "SELL"
elif "hold" in query or "neutral" in query:
signal_filter = "HOLD"
if "high confidence" in query:
min_confidence = 0.8
elif "medium confidence" in query:
min_confidence = 0.6
# Get all stocks and filter
results = self.trading_system.analyze_all_stocks(
signal_filter=signal_filter,
min_confidence=min_confidence,
save_results=False
)
# Additional filtering based on query
if "defensive" in query or "safe" in query:
results = [r for r in results if r['volatility'] < 0.03]
elif "growth" in query or "aggressive" in query:
results = [r for r in results if r['expected_change'] > 0.02]
elif "value" in query:
results = [r for r in results if r['expected_change'] > 0 and r['volatility'] < 0.025]
# Sort and limit
results = sorted(results, key=lambda x: x['confidence'], reverse=True)[:limit]
response = f"🔍 **OPPORTUNITY SEARCH: '{query}'**\n\n"
if not results:
response += "❌ No opportunities found matching your criteria."
else:
response += f"Found {len(results)} opportunities:\n\n"
for i, result in enumerate(results, 1):
response += (f"{i:2d}. {result['emoji']} **{result['ticker']}**: "
f"{result['signal']} ({result['expected_change']:+.2%}, "
f"{result['confidence']:.1%} confidence, "
f"{result['volatility']:.1%} volatility)\n")
return [types.TextContent(type="text", text=response)]
async def _get_market_summary(self, args: dict) -> list[types.TextContent]:
"""Get overall market summary."""
# Analyze all stocks
all_results = self.trading_system.analyze_all_stocks(save_results=False)
if not all_results:
return [types.TextContent(
type="text",
text="❌ No market data available"
)]
# Calculate market stats
buy_signals = [r for r in all_results if r['signal'] == 'BUY']
sell_signals = [r for r in all_results if r['signal'] == 'SELL']
hold_signals = [r for r in all_results if r['signal'] == 'HOLD']
avg_change = sum(r['expected_change'] for r in all_results) / len(all_results)
avg_confidence = sum(r['confidence'] for r in all_results) / len(all_results)
high_conf_buys = [r for r in buy_signals if r['confidence'] > 0.8]
high_conf_sells = [r for r in sell_signals if r['confidence'] > 0.8]
response = f"📈 **MARKET SUMMARY** ({datetime.now().strftime('%Y-%m-%d %H:%M')})\n\n"
response += f"📊 **Overall Statistics:**\n"
response += f"- Total stocks analyzed: {len(all_results)}\n"
response += f"- Average expected change: {avg_change:+.2%}\n"
response += f"- Average confidence: {avg_confidence:.1%}\n\n"
response += f"🎯 **Signal Distribution:**\n"
response += f"- 🟢 BUY signals: {len(buy_signals)} ({len(buy_signals)/len(all_results)*100:.1f}%)\n"
response += f"- 🔴 SELL signals: {len(sell_signals)} ({len(sell_signals)/len(all_results)*100:.1f}%)\n"
response += f"- 🟡 HOLD signals: {len(hold_signals)} ({len(hold_signals)/len(all_results)*100:.1f}%)\n\n"
if high_conf_buys:
response += f"🌟 **Top Buy Opportunities** (80%+ confidence):\n"
for i, result in enumerate(high_conf_buys[:5], 1):
response += (f"{i}. **{result['ticker']}**: {result['expected_change']:+.2%} "
f"({result['confidence']:.1%} confidence)\n")
response += "\n"
if high_conf_sells:
response += f"⚠️ **High-Confidence Sell Alerts** (80%+ confidence):\n"
for i, result in enumerate(high_conf_sells[:5], 1):
response += (f"{i}. **{result['ticker']}**: {result['expected_change']:+.2%} "
f"({result['confidence']:.1%} confidence)\n")
return [types.TextContent(type="text", text=response)]
async def _portfolio_health_check(self, args: dict) -> list[types.TextContent]:
"""Analyze portfolio health and provide recommendations."""
portfolio = args.get("portfolio")
risk_tolerance = args.get("risk_tolerance", "moderate")
results = self.trading_system.analyze_portfolio(portfolio)
if not results:
return [types.TextContent(
type="text",
text=f"❌ No data available for portfolio: {portfolio}"
)]
# Calculate health metrics
metrics = self.trading_system.comparator.calculate_portfolio_metrics(results, portfolio)
response = f"🏥 **PORTFOLIO HEALTH CHECK: {portfolio.upper()}**\n\n"
# Health score
quality_score = metrics['quality_score']
if quality_score >= 80:
health_emoji = "🟢"
health_status = "Excellent"
elif quality_score >= 60:
health_emoji = "🟡"
health_status = "Good"
else:
health_emoji = "🔴"
health_status = "Needs Attention"
response += f"{health_emoji} **Overall Health: {health_status}** (Quality Score: {quality_score:.1f}/100)\n\n"
# Key metrics
response += f"📊 **Key Metrics:**\n"
response += f"- Expected Return: {metrics['expected_return']:+.2%}\n"
response += f"- Risk Level: {metrics['portfolio_volatility']:.2%}\n"
response += f"- Confidence: {metrics['avg_confidence']:.1%}\n"
response += f"- Action Signals: {metrics['buy_signals']} BUY, {metrics['sell_signals']} SELL\n\n"
# Risk-adjusted recommendations
response += f"💡 **Recommendations** (Risk Tolerance: {risk_tolerance}):\n"
if risk_tolerance == "conservative":
if metrics['portfolio_volatility'] > 0.03:
response += "⚠️ Consider reducing high-volatility positions\n"
if metrics['buy_signals'] > metrics['num_stocks'] * 0.3:
response += "✅ Good buying opportunities, but diversify carefully\n"
elif risk_tolerance == "aggressive":
if metrics['expected_return'] < 0.02:
response += "📈 Consider more growth-oriented positions\n"
if metrics['buy_signals'] > 0:
response += "🚀 Good time to capitalize on buy signals\n"
return [types.TextContent(type="text", text=response)]
async def main():
"""Run the MCP server."""
server_instance = TradingMCPServer()
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server_instance.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="smart-trading-system",
server_version="1.0.0",
capabilities=server_instance.server.get_capabilities(
notification_options=None,
experimental_capabilities=None,
),
),
)
if __name__ == "__main__":
asyncio.run(main())