server.py•18.2 kB
#!/usr/bin/env python3
"""
MCP Server for Stochastic Process Analysis and Financial Modeling
Advanced stochastic process analysis and financial modeling MCP server
"""
import asyncio
import json
import math
import random
from datetime import datetime
from typing import Any, Dict, List, Optional, Sequence
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("stochastic-process-analyzer")
class StochasticProcessSimulator:
"""Core class for stochastic process simulations"""
@staticmethod
def geometric_brownian_motion(
drift: float = 0.05,
volatility: float = 0.2,
initial_value: float = 100.0,
time_horizon: float = 1.0,
steps: int = 252,
num_paths: int = 1000
) -> Dict[str, Any]:
"""Simulate Geometric Brownian Motion paths"""
dt = time_horizon / steps
paths = []
times = [i * dt for i in range(steps + 1)]
for _ in range(num_paths):
path = [initial_value]
current_value = initial_value
for _ in range(steps):
random_shock = random.gauss(0, 1)
current_value *= math.exp(
(drift - 0.5 * volatility**2) * dt + volatility * math.sqrt(dt) * random_shock
)
path.append(current_value)
paths.append(path)
return {
"model": "gbm",
"parameters": {
"drift": drift,
"volatility": volatility,
"initial_value": initial_value,
"time_horizon": time_horizon,
"steps": steps,
"num_paths": num_paths
},
"times": times,
"paths": paths,
"statistics": StochasticProcessSimulator._calculate_path_statistics(paths),
"timestamp": datetime.now().isoformat()
}
@staticmethod
def _calculate_path_statistics(paths: List[List[float]]) -> Dict[str, Any]:
"""Calculate statistical metrics for simulation paths"""
if not paths:
return {}
final_values = [path[-1] for path in paths]
returns = [(final_values[i] / paths[i][0] - 1) for i in range(len(paths))]
mean_final = sum(final_values) / len(final_values)
mean_return = sum(returns) / len(returns)
var_final = sum((x - mean_final)**2 for x in final_values) / len(final_values)
var_return = sum((x - mean_return)**2 for x in returns) / len(returns)
return {
"final_value": {
"mean": mean_final,
"std": math.sqrt(var_final),
"min": min(final_values),
"max": max(final_values),
"percentiles": {
"5": sorted(final_values)[int(0.05 * len(final_values))],
"25": sorted(final_values)[int(0.25 * len(final_values))],
"50": sorted(final_values)[int(0.50 * len(final_values))],
"75": sorted(final_values)[int(0.75 * len(final_values))],
"95": sorted(final_values)[int(0.95 * len(final_values))]
}
},
"returns": {
"mean": mean_return,
"std": math.sqrt(var_return),
"min": min(returns),
"max": max(returns)
}
}
class FinancialMetricsCalculator:
"""Financial risk and performance metrics calculator"""
@staticmethod
def calculate_metrics(simulation_data: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate comprehensive financial metrics from simulation data"""
paths = simulation_data.get("paths", [])
if not paths:
return {"error": "No simulation data provided"}
times = simulation_data.get("times", [])
time_horizon = simulation_data.get("parameters", {}).get("time_horizon", 1.0)
metrics = {}
# Value at Risk (VaR) calculations
final_values = [path[-1] for path in paths]
initial_value = simulation_data.get("parameters", {}).get("initial_value", 100)
losses = [(initial_value - value) / initial_value for value in final_values]
losses.sort(reverse=True)
metrics["var"] = {
"95%": losses[int(0.05 * len(losses))],
"99%": losses[int(0.01 * len(losses))],
"99.9%": losses[int(0.001 * len(losses))]
}
# Expected Shortfall (Conditional VaR)
var_95_index = int(0.05 * len(losses))
metrics["expected_shortfall"] = {
"95%": sum(losses[:var_95_index]) / var_95_index if var_95_index > 0 else 0
}
# Sharpe Ratio calculation
returns = [(final_values[i] / paths[i][0] - 1) for i in range(len(paths))]
mean_return = sum(returns) / len(returns)
return_std = math.sqrt(sum((r - mean_return)**2 for r in returns) / len(returns))
risk_free_rate = 0.02 # Assume 2% risk-free rate
annualized_return = mean_return / time_horizon
annualized_std = return_std / math.sqrt(time_horizon)
metrics["sharpe_ratio"] = (annualized_return - risk_free_rate) / annualized_std if annualized_std > 0 else 0
# Maximum Drawdown
max_drawdowns = []
for path in paths:
running_max = path[0]
max_drawdown = 0
for value in path:
running_max = max(running_max, value)
drawdown = (running_max - value) / running_max
max_drawdown = max(max_drawdown, drawdown)
max_drawdowns.append(max_drawdown)
metrics["max_drawdown"] = {
"mean": sum(max_drawdowns) / len(max_drawdowns),
"max": max(max_drawdowns),
"95th_percentile": sorted(max_drawdowns)[int(0.95 * len(max_drawdowns))]
}
# Volatility metrics
metrics["volatility"] = {
"realized_annual": annualized_std,
"model_parameter": simulation_data.get("parameters", {}).get("volatility", 0)
}
return {
"metrics": metrics,
"calculation_timestamp": datetime.now().isoformat(),
"paths_analyzed": len(paths)
}
class DataExporter:
"""Export analysis results in various formats"""
@staticmethod
def export_data(data: Dict[str, Any], format_type: str = "json") -> Dict[str, Any]:
"""Export data in specified format"""
timestamp = datetime.now().isoformat()
if format_type.lower() == "json":
return {
"format": "json",
"data": data,
"exported_at": timestamp,
"size_bytes": len(json.dumps(data))
}
elif format_type.lower() == "csv":
# Convert simulation paths to CSV format
if "paths" in data:
csv_lines = []
times = data.get("times", [])
paths = data.get("paths", [])
# Header
header = ["time"] + [f"path_{i}" for i in range(len(paths))]
csv_lines.append(",".join(header))
# Data rows
for i, time in enumerate(times):
row = [str(time)] + [str(path[i]) if i < len(path) else "" for path in paths]
csv_lines.append(",".join(row))
csv_content = "\n".join(csv_lines)
return {
"format": "csv",
"content": csv_content,
"exported_at": timestamp,
"rows": len(csv_lines),
"size_bytes": len(csv_content)
}
elif format_type.lower() == "xml":
# Simple XML export
xml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<stochastic_analysis exported_at="{timestamp}">
<model>{data.get('model', 'unknown')}</model>
<parameters>
{_dict_to_xml(data.get('parameters', {}))}
</parameters>
<statistics>
{_dict_to_xml(data.get('statistics', {}))}
</statistics>
</stochastic_analysis>"""
return {
"format": "xml",
"content": xml_content,
"exported_at": timestamp,
"size_bytes": len(xml_content)
}
else:
return {"error": f"Unsupported format: {format_type}"}
def _dict_to_xml(d: Dict[str, Any], indent: int = 8) -> str:
"""Helper function to convert dictionary to XML"""
xml_parts = []
spaces = " " * indent
for key, value in d.items():
if isinstance(value, dict):
xml_parts.append(f"{spaces}<{key}>\n{_dict_to_xml(value, indent + 4)}\n{spaces}</{key}>")
else:
xml_parts.append(f"{spaces}<{key}>{value}</{key}>")
return "\n".join(xml_parts)
class ComponentRenderer:
"""Render interactive components"""
@staticmethod
def render_gbm_component(simulation_data: Dict[str, Any]) -> str:
"""Render interactive GBM analyzer component"""
paths = simulation_data.get("paths", [])
times = simulation_data.get("times", [])
params = simulation_data.get("parameters", {})
# Generate sample paths for visualization (first 5 paths)
sample_paths = paths[:5] if len(paths) > 5 else paths
html_component = f"""
<!DOCTYPE html>
<html>
<head>
<title>GBM Analysis Component</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/3.9.1/chart.min.js"></script>
<style>
.gbm-container {{ max-width: 1200px; margin: 0 auto; padding: 20px; }}
.parameters {{ background: #f5f5f5; padding: 15px; margin-bottom: 20px; border-radius: 5px; }}
.chart-container {{ height: 400px; margin-bottom: 20px; }}
.stats-grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 15px; }}
.stat-card {{ background: white; padding: 15px; border-radius: 5px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
</style>
</head>
<body>
<div class="gbm-container">
<h1>Geometric Brownian Motion Analysis</h1>
<div class="parameters">
<h3>Model Parameters</h3>
<p><strong>Drift (�):</strong> {params.get('drift', 0):.4f}</p>
<p><strong>Volatility (�):</strong> {params.get('volatility', 0):.4f}</p>
<p><strong>Initial Value (S�):</strong> {params.get('initial_value', 0):.2f}</p>
<p><strong>Time Horizon:</strong> {params.get('time_horizon', 0):.2f} years</p>
<p><strong>Steps:</strong> {params.get('steps', 0)}</p>
<p><strong>Paths:</strong> {params.get('num_paths', 0)}</p>
</div>
<div class="chart-container">
<canvas id="pathChart"></canvas>
</div>
<div class="stats-grid">
<div class="stat-card">
<h4>Final Value Statistics</h4>
<p>Mean: {simulation_data.get('statistics', {}).get('final_value', {}).get('mean', 0):.2f}</p>
<p>Std Dev: {simulation_data.get('statistics', {}).get('final_value', {}).get('std', 0):.2f}</p>
</div>
<div class="stat-card">
<h4>Return Statistics</h4>
<p>Mean Return: {simulation_data.get('statistics', {}).get('returns', {}).get('mean', 0):.4f}</p>
<p>Return Volatility: {simulation_data.get('statistics', {}).get('returns', {}).get('std', 0):.4f}</p>
</div>
</div>
</div>
<script>
const ctx = document.getElementById('pathChart').getContext('2d');
const times = {json.dumps(times)};
const samplePaths = {json.dumps(sample_paths)};
const datasets = samplePaths.map((path, index) => ({{
label: `Path ${{index + 1}}`,
data: path,
borderColor: `hsl(${{index * 72}}, 70%, 50%)`,
backgroundColor: `hsla(${{index * 72}}, 70%, 50%, 0.1)`,
fill: false,
tension: 0.1
}}));
new Chart(ctx, {{
type: 'line',
data: {{
labels: times,
datasets: datasets
}},
options: {{
responsive: true,
maintainAspectRatio: false,
scales: {{
x: {{
title: {{
display: true,
text: 'Time (years)'
}}
}},
y: {{
title: {{
display: true,
text: 'Asset Value'
}}
}}
}},
plugins: {{
title: {{
display: true,
text: 'Geometric Brownian Motion Sample Paths'
}},
legend: {{
display: true,
position: 'top'
}}
}}
}}
}});
</script>
</body>
</html>"""
return html_component
# Global storage for simulation results and metrics
simulation_cache = {}
metrics_cache = {}
@mcp.tool()
async def simulate_stochastic_process(
model: str,
parameters: Optional[Dict[str, Any]] = None
) -> str:
"""Generate comprehensive stochastic process simulations.
Args:
model: Stochastic process model type (currently supports "gbm")
parameters: Optional dict with drift, volatility, initial_value, time_horizon, steps, num_paths
"""
if parameters is None:
parameters = {}
if model == "gbm":
result = StochasticProcessSimulator.geometric_brownian_motion(**parameters)
simulation_id = f"sim_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{random.randint(1000, 9999)}"
simulation_cache[simulation_id] = result
return (f"Simulation completed successfully!\n\n"
f"Simulation ID: {simulation_id}\n"
f"Model: {result['model'].upper()}\n"
f"Paths generated: {len(result['paths'])}\n"
f"Time steps: {len(result['times']) - 1}\n"
f"Final value statistics:\n"
f" - Mean: {result['statistics']['final_value']['mean']:.2f}\n"
f" - Std Dev: {result['statistics']['final_value']['std']:.2f}\n"
f" - 95% VaR: {result['statistics']['final_value']['percentiles']['5']:.2f}\n\n"
f"Use simulation ID '{simulation_id}' for further analysis.")
else:
return f"Unsupported model: {model}"
@mcp.tool()
async def calculate_financial_metrics(simulation_id: str) -> str:
"""Calculate comprehensive financial risk and performance metrics.
Args:
simulation_id: ID of the simulation to analyze
"""
if simulation_id not in simulation_cache:
return f"Simulation ID '{simulation_id}' not found."
simulation_data = simulation_cache[simulation_id]
metrics = FinancialMetricsCalculator.calculate_metrics(simulation_data)
metrics_id = f"metrics_{simulation_id}"
metrics_cache[metrics_id] = metrics
return (f"Financial metrics calculated for simulation {simulation_id}:\n\n"
f"Risk Metrics:\n"
f" - VaR (95%): {metrics['metrics']['var']['95%']:.4f}\n"
f" - VaR (99%): {metrics['metrics']['var']['99%']:.4f}\n"
f" - Expected Shortfall (95%): {metrics['metrics']['expected_shortfall']['95%']:.4f}\n\n"
f"Performance Metrics:\n"
f" - Sharpe Ratio: {metrics['metrics']['sharpe_ratio']:.4f}\n"
f" - Max Drawdown (Mean): {metrics['metrics']['max_drawdown']['mean']:.4f}\n"
f" - Realized Volatility: {metrics['metrics']['volatility']['realized_annual']:.4f}\n\n"
f"Metrics ID: {metrics_id}")
@mcp.tool()
async def export_analysis(data_id: str, format: str = "json") -> str:
"""Export analysis results in various formats.
Args:
data_id: ID of the data to export
format: Export format (json, csv, xml)
"""
data = None
if data_id in simulation_cache:
data = simulation_cache[data_id]
elif data_id in metrics_cache:
data = metrics_cache[data_id]
else:
return f"Data ID '{data_id}' not found."
export_result = DataExporter.export_data(data, format)
if "error" in export_result:
return export_result["error"]
return (f"Data exported successfully!\n\n"
f"Format: {export_result['format'].upper()}\n"
f"Size: {export_result['size_bytes']} bytes\n"
f"Exported at: {export_result['exported_at']}\n\n"
f"Content preview:\n{str(export_result.get('content', export_result.get('data', '')))[:500]}...")
@mcp.tool()
async def render_gbm_component(simulation_id: str) -> str:
"""Render interactive GBM analyzer component.
Args:
simulation_id: ID of the simulation to visualize
"""
if simulation_id not in simulation_cache:
return f"Simulation ID '{simulation_id}' not found."
simulation_data = simulation_cache[simulation_id]
html_component = ComponentRenderer.render_gbm_component(simulation_data)
return (f"Interactive GBM component rendered for simulation {simulation_id}\n\n"
f"Component size: {len(html_component)} characters\n"
f"Features:\n"
f" - Interactive chart with sample paths\n"
f" - Parameter display\n"
f" - Statistical summaries\n"
f" - Responsive design\n\n"
f"HTML Component:\n{html_component}")
if __name__ == "__main__":
mcp.run()