import json
import pandas as pd
from prophet import Prophet
from datetime import datetime
# =============================================================================
# MCP Protocol Request Routing
# =============================================================================
def handle_request(method, params):
"""
Main request router for MCP (Model Context Protocol) JSON-RPC methods.
Supported:
- initialize
- tools/list
- tools/call
"""
if method == "initialize":
return handle_initialize()
elif method == "tools/list":
return handle_tools_list()
elif method == "tools/call":
return handle_tool_call(params)
else:
raise ValueError(f"Method not found: {method}")
# =============================================================================
# MCP Protocol Handlers
# =============================================================================
def handle_initialize():
return {
"protocolVersion": "2024-11-05",
"serverInfo": {"name": "prophet_mcp", "version": "0.1.0"},
"capabilities": {"tools": {}},
}
def handle_tools_list():
"""
For JSON-RPC MCP, schema field is camelCase: inputSchema
Keep only ds, y, f (future periods), lower_limit, upper_limit.
"""
return {
"tools": [
{
"name": "forecast_time_series",
"description": "Runs a simple Prophet forecast on ds/y and returns ds + yhat/yhat_lower/yhat_upper. Optionally validates forecast against upper/lower bounds.",
"annotations": {"read_only": False},
"inputSchema": {
"type": "object",
"properties": {
"ds": {
"type": "array",
"items": {"type": "string"},
"description": "List of dates in ISO format (e.g., YYYY-MM-DD).",
},
"y": {
"type": "array",
"items": {"type": "number"},
"description": "List of numeric values aligned with ds.",
},
"periods": {
"type": "integer",
"description": "Number of future periods to forecast.",
"default": 10,
},
"lower_limit": {
"type": "number",
"description": "Optional lower bound. Forecast values below this will be flagged as out-of-bounds.",
},
"upper_limit": {
"type": "number",
"description": "Optional upper bound. Forecast values above this will be flagged as out-of-bounds.",
},
},
"required": ["ds", "y"],
"additionalProperties": False,
},
}
]
}
def handle_tool_call(params):
tool_name = params.get("name")
arguments = params.get("arguments", {})
# Decode string args if needed
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except Exception:
return {
"isError": True,
"content": [{"type": "text", "text": "Invalid arguments: expected object or JSON string."}],
}
if tool_name == "forecast_time_series":
data = forecast_time_series(arguments)
# Extract pre-formatted summary (includes Chart.js config)
summary = ""
if isinstance(data, dict) and "meta" in data and "summary" in data["meta"]:
summary = data["meta"]["summary"].strip()
return {"content": [{"type": "text", "text": summary}]}
return {
"isError": True,
"content": [{"type": "text", "text": f"Tool not found: {tool_name}"}],
}
# =============================================================================
# Forecasting Engine
# =============================================================================
def forecast_time_series(arguments):
"""
Generates a time-series forecast using Meta's Prophet model.
Args:
arguments (dict): A dictionary containing:
- ds (list[str]): List of date strings in ISO format.
- y (list[float]): List of numeric values aligned with ds.
- periods (int, optional): Number of future periods to forecast. Default: 10.
- lower_limit (float, optional): Lower bound for forecast validation.
- upper_limit (float, optional): Upper bound for forecast validation.
Returns:
dict: A JSON-serializable dict with forecast rows, metadata, and
an LLM-friendly summary including Chart.js visualization config.
"""
ds = arguments.get("ds")
y = arguments.get("y")
f = int(arguments.get("periods", 10))
lower_limit = arguments.get("lower_limit", None)
upper_limit = arguments.get("upper_limit", None)
# Build input DataFrame
df = pd.DataFrame({"ds": ds, "y": y})
df["ds"] = pd.to_datetime(df["ds"])
# Fit Prophet model and generate forecast
try:
model = Prophet()
model.fit(df)
future = model.make_future_dataframe(periods=f)
forecast = model.predict(future)
except Exception as e:
return {"error": str(e)}
# Extract and format forecast columns
out = forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].copy()
out["ds"] = out["ds"].dt.strftime("%Y-%m-%d")
out["yhat"] = out["yhat"].round(2)
out["yhat_lower"] = out["yhat_lower"].round(2)
out["yhat_upper"] = out["yhat_upper"].round(2)
# --- Bounds checking ---
bounds_active = lower_limit is not None or upper_limit is not None
violations = []
statuses = []
for _, row in out.iterrows():
yhat_val = row["yhat"]
if upper_limit is not None and yhat_val > upper_limit:
statuses.append("⚠️ EXCEEDS UPPER")
violations.append(f" {row['ds']}: yhat={yhat_val:.2f} > upper_limit={upper_limit}")
elif lower_limit is not None and yhat_val < lower_limit:
statuses.append("⚠️ BELOW LOWER")
violations.append(f" {row['ds']}: yhat={yhat_val:.2f} < lower_limit={lower_limit}")
else:
statuses.append("✅ OK")
out["status"] = statuses
# --- Build an LLM-friendly summary ---
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
hist_start = df["ds"].min().strftime("%Y-%m-%d")
hist_end = df["ds"].max().strftime("%Y-%m-%d")
# Historical data summary
hist_mean = df["y"].mean()
hist_min = df["y"].min()
hist_max = df["y"].max()
hist_std = df["y"].std()
# Forecast data summary (future-only rows)
future_only = forecast.iloc[len(df):]
fcst_mean = future_only["yhat"].mean() if len(future_only) > 0 else 0
fcst_min = future_only["yhat"].min() if len(future_only) > 0 else 0
fcst_max = future_only["yhat"].max() if len(future_only) > 0 else 0
summary_section = (
f"Summary of forecast metrics:\n"
f" - Historical Period: {hist_start} to {hist_end}\n"
f" - Historical Data Points: {len(df)}\n"
f" - Historical Mean: {hist_mean:.2f}\n"
f" - Historical Min: {hist_min:.2f}\n"
f" - Historical Max: {hist_max:.2f}\n"
f" - Historical Std Dev: {hist_std:.2f}\n"
f" - Forecast Periods: {f}\n"
f" - Forecast Mean (yhat): {fcst_mean:.2f}\n"
f" - Forecast Min (yhat): {fcst_min:.2f}\n"
f" - Forecast Max (yhat): {fcst_max:.2f}\n"
)
# Bounds summary
bounds_section = ""
if bounds_active:
bounds_section = f"\nBounds Validation:\n"
if lower_limit is not None:
bounds_section += f" - Lower Limit: {lower_limit}\n"
if upper_limit is not None:
bounds_section += f" - Upper Limit: {upper_limit}\n"
if violations:
bounds_section += f" - ⚠️ {len(violations)} date(s) OUT OF BOUNDS:\n"
bounds_section += "\n".join(violations) + "\n"
else:
bounds_section += f" - ✅ All forecast values are within bounds.\n"
# Build a data table for the forecast rows
if bounds_active:
table_header = "Date | yhat | yhat_lower | yhat_upper | Status"
else:
table_header = "Date | yhat | yhat_lower | yhat_upper"
table_divider = "-" * len(table_header)
table_rows = []
for _, row in out.iterrows():
row_str = f"{row['ds']} | {row['yhat']:.2f} | {row['yhat_lower']:.2f} | {row['yhat_upper']:.2f}"
if bounds_active:
row_str += f" | {row['status']}"
table_rows.append(row_str)
table_body = "\n".join(table_rows)
# Determine trend direction
if len(future_only) > 0 and hist_mean > 0:
change_pct = ((fcst_mean - hist_mean) / hist_mean) * 100
if change_pct > 0:
trend_direction = f"UPWARD (+{change_pct:.1f}% vs historical mean)"
elif change_pct < 0:
trend_direction = f"DOWNWARD ({change_pct:.1f}% vs historical mean)"
else:
trend_direction = "FLAT (no change vs historical mean)"
else:
trend_direction = "N/A"
# Build Chart.js limit line datasets
limit_datasets = ""
if upper_limit is not None:
upper_line_data = json.dumps([upper_limit] * len(out))
limit_datasets += f""",
{{
"label": "Upper Limit ({upper_limit})",
"data": {upper_line_data},
"borderWidth": 2,
"fill": false,
"pointRadius": 0,
"backgroundColor": "rgba(255, 99, 132, 0.0)",
"borderColor": "rgba(255, 99, 132, 0.8)",
"borderDash": [10, 5],
"order": 2
}}"""
if lower_limit is not None:
lower_line_data = json.dumps([lower_limit] * len(out))
limit_datasets += f""",
{{
"label": "Lower Limit ({lower_limit})",
"data": {lower_line_data},
"borderWidth": 2,
"fill": false,
"pointRadius": 0,
"backgroundColor": "rgba(255, 165, 0, 0.0)",
"borderColor": "rgba(255, 165, 0, 0.8)",
"borderDash": [10, 5],
"order": 2
}}"""
formatted_output = f"""
### Prophet Forecast Data ###
This data set contains a time-series forecast generated by Meta's Prophet model.
What the columns mean:
- Date: the date for the observed or predicted value
- yhat: the predicted value (the model's best estimate)
- yhat_lower: the lower bound of the confidence interval (worst-case estimate)
- yhat_upper: the upper bound of the confidence interval (best-case estimate)
{"- Status: whether the forecast value is within the specified bounds" if bounds_active else ""}
The wider the gap between yhat_lower and yhat_upper, the less certain the model is about that prediction.
This forecast was generated on {timestamp} for {f} future periods
based on {len(df)} historical observations from {hist_start} to {hist_end}.
{summary_section}
- Trend Direction: {trend_direction}
{bounds_section}
Key Takeaway: The model predicts the values will trend {trend_direction.split(' ')[0].lower()} over the next {f} periods,
with predicted values ranging from {fcst_min:.2f} to {fcst_max:.2f}
Please review this data in detail and provide an analysis of the trends and predictions:
{table_header}
{table_divider}
{table_body}
Output format: Recommend using Chart.js to visualize the forecast data. Show the historical data as black points and the forecast as a blue line with the confidence interval as a shaded region.
Use these Chart.js settings to render the chart:
chartjs = {{
"type": "line",
"data": {{
"labels": {json.dumps(out["ds"].tolist())},
"datasets": [
{{
"label": "Confidence Lower",
"data": {json.dumps(out["yhat_lower"].tolist())},
"borderWidth": 0,
"pointRadius": 0,
"fill": false,
"backgroundColor": "rgba(54, 162, 235, 0.0)",
"borderColor": "rgba(54, 162, 235, 0.0)",
"hidden": false
}},
{{
"label": "Confidence Upper",
"data": {json.dumps(out["yhat_upper"].tolist())},
"borderWidth": 0,
"pointRadius": 0,
"fill": "-1",
"backgroundColor": "rgba(54, 162, 235, 0.15)",
"borderColor": "rgba(54, 162, 235, 0.0)"
}},
{{
"label": "Forecast (yhat)",
"data": {json.dumps(out["yhat"].tolist())},
"borderWidth": 2,
"fill": false,
"pointRadius": 0,
"backgroundColor": "rgba(54, 162, 235, 0.2)",
"borderColor": "rgba(54, 162, 235, 0.8)",
"borderDash": [5, 5],
"order": 1
}},
{{
"label": "Actuals",
"data": {json.dumps(([float(v) for v in y] + [None] * f))},
"fill": false,
"pointRadius": 6,
"pointHoverRadius": 8,
"borderWidth": 0,
"backgroundColor": "rgba(255, 107, 107, 1)",
"borderColor": "rgba(255, 107, 107, 0.0)",
"order": 0
}}{limit_datasets}
]
}},
"options": {{
"responsive": true,
"plugins": {{
"legend": {{
"labels": {{
"filter": "function(item) {{ return !item.text.includes('Confidence'); }}"
}}
}}
}},
"scales": {{
"y": {{
"beginAtZero": true
}}
}}
}}
}}
"""
meta = {
"f": f,
"n_history": int(df.shape[0]),
"start": df["ds"].min().strftime("%Y-%m-%dT%H:%M:%S"),
"end": df["ds"].max().strftime("%Y-%m-%dT%H:%M:%S"),
"summary": formatted_output,
}
if bounds_active:
meta["lower_limit"] = lower_limit
meta["upper_limit"] = upper_limit
meta["violations_count"] = len(violations)
return {
"meta": meta,
"forecast": out.to_dict(orient="records"),
}