"""
Klipper MCP Server - Main Entry Point
Compatible with Python 3.9+ (CB1/Raspberry Pi)
Run with: python server.py
"""
import asyncio
import inspect
import json
import os
import sys
import traceback
from datetime import datetime
from aiohttp import web
from typing import Any, Callable, Dict, get_type_hints
import html
import config
from moonraker import init_client, close_client, get_client
# ============================================================
# HTML Rendering Utilities (for fetch_webpage compatibility)
# ============================================================
def escape(text: Any) -> str:
"""HTML escape any value."""
return html.escape(str(text)) if text is not None else ""
def render_json_as_html(data: Any, title: str = "MCP Response", depth: int = 0) -> str:
"""Recursively render any JSON data as formatted HTML."""
if isinstance(data, dict):
items = []
for key, value in data.items():
rendered_value = render_json_value(value, depth + 1)
items.append(f'<dt><strong>{escape(key)}</strong></dt><dd>{rendered_value}</dd>')
return f'<dl style="margin-left:{depth*20}px;">{"".join(items)}</dl>'
elif isinstance(data, list):
if not data:
return '<em>(empty list)</em>'
items = [f'<li>{render_json_value(item, depth + 1)}</li>' for item in data]
return f'<ul style="margin-left:{depth*20}px;">{"".join(items)}</ul>'
else:
return escape(data)
def render_json_value(value: Any, depth: int = 0) -> str:
"""Render a single JSON value as HTML."""
if value is None:
return '<em>null</em>'
elif isinstance(value, bool):
color = "green" if value else "red"
return f'<span style="color:{color};">{str(value).lower()}</span>'
elif isinstance(value, (int, float)):
return f'<code>{value}</code>'
elif isinstance(value, str):
return escape(value)
elif isinstance(value, (dict, list)):
return render_json_as_html(value, depth=depth)
else:
return escape(value)
def html_page(title: str, body: str, subtitle: str = None) -> str:
"""Wrap content in a complete HTML page with styling."""
subtitle_html = f'<p style="color:#666;">{escape(subtitle)}</p>' if subtitle else ''
return f'''<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{escape(title)} - Klipper MCP</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
max-width: 1200px; margin: 0 auto; padding: 20px; background: #f5f5f5; }}
h1 {{ color: #333; border-bottom: 2px solid #007acc; padding-bottom: 10px; }}
h2 {{ color: #007acc; margin-top: 30px; }}
h3 {{ color: #555; }}
section {{ background: white; padding: 20px; margin: 20px 0; border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
dl {{ display: grid; grid-template-columns: auto 1fr; gap: 8px 16px; }}
dt {{ font-weight: bold; color: #333; }}
dd {{ margin: 0; color: #666; }}
code {{ background: #e8e8e8; padding: 2px 6px; border-radius: 3px; font-family: monospace; }}
pre {{ background: #2d2d2d; color: #f8f8f2; padding: 15px; border-radius: 5px;
overflow-x: auto; font-size: 13px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 10px; text-align: left; }}
th {{ background: #007acc; color: white; }}
tr:nth-child(even) {{ background: #f9f9f9; }}
.status-ok {{ color: green; font-weight: bold; }}
.status-error {{ color: red; font-weight: bold; }}
.temp {{ font-size: 1.2em; }}
.tool-card {{ border: 1px solid #ddd; padding: 10px; margin: 5px 0; border-radius: 5px; }}
.tool-name {{ font-weight: bold; color: #007acc; }}
.timestamp {{ color: #999; font-size: 0.9em; }}
a {{ color: #007acc; text-decoration: none; }}
a:hover {{ text-decoration: underline; }}
.nav {{ background: #007acc; padding: 10px; border-radius: 5px; margin-bottom: 20px; }}
.nav a {{ color: white; margin-right: 15px; }}
</style>
</head>
<body>
<nav class="nav">
<a href="/">Home</a>
<a href="/status.html">Status</a>
<a href="/tools.html">Tools</a>
<a href="/health.html">Health</a>
<a href="/config.html">Config Files</a>
</nav>
<h1>{escape(title)}</h1>
{subtitle_html}
{body}
<footer style="margin-top:40px; padding-top:20px; border-top:1px solid #ddd; color:#999;">
<p class="timestamp">Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>
<p>Klipper MCP Server v1.0.0 | Printer: {escape(config.PRINTER_NAME)}</p>
</footer>
</body>
</html>'''
def error_html(title: str, error: str) -> str:
"""Render an error page."""
return html_page(title, f'''
<section>
<h2 class="status-error">Error</h2>
<p>{escape(error)}</p>
</section>
''')
# Tool registry
TOOLS: Dict[str, Dict[str, Any]] = {}
def get_tool_description(tool_info):
"""Extract the first meaningful line from a tool's docstring."""
doc = tool_info.get("description", "") or ""
# Strip leading/trailing whitespace and split by lines
lines = doc.strip().split('\n')
# Find the first non-empty line
for line in lines:
stripped = line.strip()
if stripped:
return stripped
return "No description available"
def audit_log(action: str, details: dict = None):
"""Write to audit log for security tracking."""
log_path = getattr(config, 'AUDIT_LOG_FILE', '/home/biqu/klipper-mcp/data/audit.log')
os.makedirs(os.path.dirname(log_path), exist_ok=True)
entry = {
"timestamp": datetime.now().isoformat(),
"action": action,
"details": details or {}
}
try:
with open(log_path, 'a') as f:
f.write(json.dumps(entry) + '\n')
except Exception as e:
print(f"Failed to write audit log: {e}", file=sys.stderr)
# ============================================================
# Import and register all tools from tool modules
# ============================================================
def register_all_tools():
"""Import and register all tool modules."""
from tools import register_all_tools as _register
# Create a mock MCP object that captures tool registrations
class MockMCP:
def tool(self):
def decorator(func):
tool_name = func.__name__
TOOLS[tool_name] = {
"function": func,
"description": func.__doc__ or "",
"name": tool_name
}
return func
return decorator
mock_mcp = MockMCP()
_register(mock_mcp)
print(f"✓ Registered {len(TOOLS)} tools", file=sys.stderr)
# ============================================================
# HTTP API Handlers
# ============================================================
async def handle_list_tools(request: web.Request) -> web.Response:
"""List all available tools."""
tools_list = []
for name, tool_info in TOOLS.items():
tools_list.append({
"name": name,
"description": get_tool_description(tool_info)
})
return web.json_response({
"tools": tools_list,
"count": len(tools_list)
})
async def handle_call_tool(request: web.Request) -> web.Response:
"""Call a specific tool."""
try:
data = await request.json()
except json.JSONDecodeError:
return web.json_response({"error": "Invalid JSON"}, status=400)
tool_name = data.get("tool") or data.get("name")
arguments = data.get("arguments", {}) or data.get("params", {})
if not tool_name:
return web.json_response({"error": "Missing 'tool' field"}, status=400)
if tool_name not in TOOLS:
return web.json_response({
"error": f"Unknown tool: {tool_name}",
"available_tools": list(TOOLS.keys())
}, status=404)
tool_info = TOOLS[tool_name]
func = tool_info["function"]
# Log the call
audit_log("tool_call", {"tool": tool_name, "arguments": arguments})
try:
# Call the tool function
if asyncio.iscoroutinefunction(func):
result = await func(**arguments)
else:
result = func(**arguments)
return web.json_response({
"tool": tool_name,
"result": json.loads(result) if isinstance(result, str) else result
})
except TypeError as e:
return web.json_response({
"error": f"Invalid arguments: {str(e)}",
"tool": tool_name
}, status=400)
except Exception as e:
traceback.print_exc()
return web.json_response({
"error": str(e),
"tool": tool_name
}, status=500)
async def handle_server_info(request: web.Request) -> web.Response:
"""Get server information."""
return web.json_response({
"name": "klipper-mcp",
"version": "1.0.0",
"printer": config.PRINTER_NAME,
"moonraker_url": config.MOONRAKER_URL,
"armed": config.ARMED,
"tools_count": len(TOOLS),
"features": {
"stealthchanger": True,
"led_effects": True,
"spoolman": config.SPOOLMAN_ENABLED,
"tts": config.TTS_ENABLED,
}
})
async def handle_printer_status(request: web.Request) -> web.Response:
"""Quick printer status endpoint."""
try:
client = get_client()
result = await client.get_printer_status()
if "error" in result:
return web.json_response({"error": result["error"]}, status=500)
status = result.get("result", {}).get("status", {})
print_stats = status.get("print_stats", {})
extruder = status.get("extruder", {})
bed = status.get("heater_bed", {})
return web.json_response({
"state": print_stats.get("state"),
"filename": print_stats.get("filename"),
"progress": print_stats.get("progress", 0),
"temperatures": {
"extruder": {
"current": extruder.get("temperature"),
"target": extruder.get("target")
},
"bed": {
"current": bed.get("temperature"),
"target": bed.get("target")
}
}
})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
async def handle_health(request: web.Request) -> web.Response:
"""Health check endpoint."""
try:
client = get_client()
result = await client.get_printer_status()
moonraker_ok = "error" not in result
except:
moonraker_ok = False
return web.json_response({
"status": "ok" if moonraker_ok else "degraded",
"moonraker_connected": moonraker_ok,
"timestamp": datetime.now().isoformat()
})
# ============================================================
# HTML Endpoint Handlers (for fetch_webpage compatibility)
# ============================================================
async def handle_server_info_html(request: web.Request) -> web.Response:
"""Server info in HTML format."""
body = f'''
<section>
<h2>Server Information</h2>
<dl>
<dt>Server Name</dt><dd>klipper-mcp</dd>
<dt>Version</dt><dd>1.0.0</dd>
<dt>Printer</dt><dd>{escape(config.PRINTER_NAME)}</dd>
<dt>Moonraker URL</dt><dd><code>{escape(config.MOONRAKER_URL)}</code></dd>
<dt>Armed Mode</dt><dd><span class="{"status-ok" if config.ARMED else "status-error"}">{config.ARMED}</span></dd>
<dt>Tools Available</dt><dd>{len(TOOLS)}</dd>
</dl>
</section>
<section>
<h2>Features</h2>
<dl>
<dt>StealthChanger</dt><dd><span class="status-ok">Enabled</span></dd>
<dt>LED Effects</dt><dd><span class="status-ok">Enabled</span></dd>
<dt>Spoolman</dt><dd><span class="{"status-ok" if config.SPOOLMAN_ENABLED else "status-error"}">{config.SPOOLMAN_ENABLED}</span></dd>
<dt>TTS Notifications</dt><dd><span class="{"status-ok" if config.TTS_ENABLED else "status-error"}">{config.TTS_ENABLED}</span></dd>
</dl>
</section>
<section>
<h2>Quick Links</h2>
<ul>
<li><a href="/status.html">Printer Status</a></li>
<li><a href="/tools.html">Available Tools ({len(TOOLS)})</a></li>
<li><a href="/health.html">Health Check</a></li>
<li><a href="/shaper.html">Input Shaper Status</a></li>
<li><a href="/config.html">Configuration Files</a></li>
</ul>
</section>
'''
return web.Response(text=html_page("Klipper MCP Server", body, f"Controlling {config.PRINTER_NAME}"),
content_type='text/html')
async def handle_printer_status_html(request: web.Request) -> web.Response:
"""Printer status in HTML format."""
try:
client = get_client()
result = await client.get_printer_status()
if "error" in result:
return web.Response(text=error_html("Printer Status", result["error"]),
content_type='text/html', status=500)
status = result.get("result", {}).get("status", {})
print_stats = status.get("print_stats", {})
extruder = status.get("extruder", {})
extruder1 = status.get("extruder1", {})
extruder2 = status.get("extruder2", {})
bed = status.get("heater_bed", {})
toolhead = status.get("toolhead", {})
state = print_stats.get("state", "unknown")
state_class = "status-ok" if state in ["standby", "printing", "complete"] else "status-error" if state == "error" else ""
progress = print_stats.get("progress", 0) or 0
body = f'''
<section>
<h2>Print State</h2>
<dl>
<dt>State</dt><dd><span class="{state_class}" style="font-size:1.3em;">{escape(state).upper()}</span></dd>
<dt>Filename</dt><dd>{escape(print_stats.get("filename")) or "<em>None</em>"}</dd>
<dt>Progress</dt><dd>
<progress value="{progress * 100}" max="100" style="width:200px;"></progress>
{progress * 100:.1f}%
</dd>
<dt>Print Duration</dt><dd>{print_stats.get("print_duration", 0):.0f}s</dd>
</dl>
</section>
<section>
<h2>Temperatures</h2>
<table>
<tr><th>Heater</th><th>Current</th><th>Target</th><th>Power</th></tr>
<tr>
<td><strong>T0 (Extruder)</strong></td>
<td class="temp">{extruder.get("temperature", 0):.1f}°C</td>
<td>{extruder.get("target", 0):.0f}°C</td>
<td>{(extruder.get("power", 0) or 0) * 100:.0f}%</td>
</tr>
<tr>
<td><strong>T1 (Extruder1)</strong></td>
<td class="temp">{extruder1.get("temperature", 0):.1f}°C</td>
<td>{extruder1.get("target", 0):.0f}°C</td>
<td>{(extruder1.get("power", 0) or 0) * 100:.0f}%</td>
</tr>
<tr>
<td><strong>T2 (Extruder2)</strong></td>
<td class="temp">{extruder2.get("temperature", 0):.1f}°C</td>
<td>{extruder2.get("target", 0):.0f}°C</td>
<td>{(extruder2.get("power", 0) or 0) * 100:.0f}%</td>
</tr>
<tr>
<td><strong>Bed</strong></td>
<td class="temp">{bed.get("temperature", 0):.1f}°C</td>
<td>{bed.get("target", 0):.0f}°C</td>
<td>{(bed.get("power", 0) or 0) * 100:.0f}%</td>
</tr>
</table>
</section>
<section>
<h2>Toolhead</h2>
<dl>
<dt>Position</dt><dd>X: {toolhead.get("position", [0,0,0,0])[0]:.2f}, Y: {toolhead.get("position", [0,0,0,0])[1]:.2f}, Z: {toolhead.get("position", [0,0,0,0])[2]:.2f}</dd>
<dt>Homed Axes</dt><dd><code>{toolhead.get("homed_axes", "none")}</code></dd>
<dt>Max Velocity</dt><dd>{toolhead.get("max_velocity", 0)} mm/s</dd>
<dt>Max Accel</dt><dd>{toolhead.get("max_accel", 0)} mm/s²</dd>
</dl>
</section>
'''
return web.Response(text=html_page("Printer Status", body, f"{config.PRINTER_NAME}"),
content_type='text/html')
except Exception as e:
return web.Response(text=error_html("Printer Status", str(e)),
content_type='text/html', status=500)
async def handle_list_tools_html(request: web.Request) -> web.Response:
"""List all tools in HTML format."""
# Group tools by category
categories = {}
for name, tool_info in sorted(TOOLS.items()):
# Determine category from tool name prefix
if name.startswith("get_"):
cat = "Query/Read"
elif name.startswith("set_"):
cat = "Configuration"
elif name.startswith("list_"):
cat = "Listing"
elif any(name.startswith(p) for p in ["start_", "stop_", "pause_", "resume_", "cancel_"]):
cat = "Control"
elif any(x in name for x in ["tool", "pickup", "dropoff"]):
cat = "Toolchanger"
elif any(x in name for x in ["led", "light"]):
cat = "LED Effects"
elif any(x in name for x in ["temperature", "temp", "heater", "pid"]):
cat = "Temperature"
elif any(x in name for x in ["mesh", "bed", "level", "calibrat"]):
cat = "Bed & Calibration"
elif any(x in name for x in ["file", "gcode"]):
cat = "Files & G-code"
elif any(x in name for x in ["camera", "timelapse", "snapshot"]):
cat = "Camera"
elif any(x in name for x in ["spool", "filament"]):
cat = "Spoolman"
elif any(x in name for x in ["notify", "announce", "tts"]):
cat = "Notifications"
elif any(x in name for x in ["backup", "maintenance", "audit", "log"]):
cat = "Maintenance"
elif any(x in name for x in ["diagnose", "error", "issue", "mcu"]):
cat = "Diagnostics"
else:
cat = "Other"
if cat not in categories:
categories[cat] = []
categories[cat].append((name, tool_info))
body = f'<section><h2>Total Tools: {len(TOOLS)}</h2></section>'
for cat in sorted(categories.keys()):
tools = categories[cat]
tools_html = ""
for name, tool_info in tools:
desc = get_tool_description(tool_info)
tools_html += f'''
<div class="tool-card">
<span class="tool-name">{escape(name)}</span>
<p style="margin:5px 0 0 0; color:#666;">{escape(desc)}</p>
<small><a href="/call/{name}.html">Try it →</a></small>
</div>
'''
body += f'''
<section>
<h2>{escape(cat)} ({len(tools)})</h2>
{tools_html}
</section>
'''
return web.Response(text=html_page("Available Tools", body, f"{len(TOOLS)} tools registered"),
content_type='text/html')
async def handle_health_html(request: web.Request) -> web.Response:
"""Health check in HTML format."""
try:
client = get_client()
result = await client.get_printer_status()
moonraker_ok = "error" not in result
except Exception as e:
moonraker_ok = False
error_msg = str(e)
else:
error_msg = None
status_class = "status-ok" if moonraker_ok else "status-error"
status_text = "OK" if moonraker_ok else "DEGRADED"
body = f'''
<section>
<h2>System Health</h2>
<dl>
<dt>Overall Status</dt>
<dd><span class="{status_class}" style="font-size:1.5em;">{status_text}</span></dd>
<dt>Moonraker Connected</dt>
<dd><span class="{status_class}">{moonraker_ok}</span></dd>
<dt>MCP Server</dt>
<dd><span class="status-ok">Running</span></dd>
<dt>Tools Loaded</dt>
<dd>{len(TOOLS)}</dd>
</dl>
{f'<p class="status-error">Error: {escape(error_msg)}</p>' if error_msg else ''}
</section>
'''
return web.Response(text=html_page("Health Check", body), content_type='text/html')
async def handle_call_tool_html(request: web.Request) -> web.Response:
"""Call a tool and return HTML result."""
tool_name = request.match_info.get('tool_name', '').replace('.html', '')
if not tool_name:
return web.Response(text=error_html("Tool Call", "No tool specified"),
content_type='text/html', status=400)
if tool_name not in TOOLS:
body = f'''
<section>
<h2 class="status-error">Unknown Tool: {escape(tool_name)}</h2>
<p>This tool does not exist. <a href="/tools.html">View available tools</a></p>
</section>
'''
return web.Response(text=html_page("Tool Not Found", body),
content_type='text/html', status=404)
tool_info = TOOLS[tool_name]
func = tool_info["function"]
# Get arguments from query string
arguments = dict(request.query)
# If no arguments provided, show a form
if not arguments and request.method == 'GET':
desc = tool_info["description"] or "No description available."
body = f'''
<section>
<h2>Tool: {escape(tool_name)}</h2>
<p>{escape(desc)}</p>
<form method="GET" action="/call/{escape(tool_name)}.html">
<p><em>Add query parameters to call with arguments, or click below to call without arguments:</em></p>
<button type="submit" style="padding:10px 20px; font-size:1.1em; cursor:pointer;">
Run {escape(tool_name)}
</button>
</form>
<p style="margin-top:20px;"><small>Example: <code>/call/{escape(tool_name)}.html?param1=value1¶m2=value2</code></small></p>
</section>
'''
return web.Response(text=html_page(f"Tool: {tool_name}", body), content_type='text/html')
audit_log("tool_call_html", {"tool": tool_name, "arguments": arguments})
try:
if asyncio.iscoroutinefunction(func):
result = await func(**arguments)
else:
result = func(**arguments)
# Parse JSON if string
if isinstance(result, str):
try:
result = json.loads(result)
except:
pass
# Render result
if isinstance(result, (dict, list)):
result_html = render_json_as_html(result)
else:
result_html = f'<pre>{escape(str(result))}</pre>'
body = f'''
<section>
<h2>Tool: {escape(tool_name)}</h2>
<p><strong>Arguments:</strong> {escape(json.dumps(arguments)) if arguments else "<em>None</em>"}</p>
</section>
<section>
<h2>Result</h2>
{result_html}
</section>
<section>
<h3>Raw JSON</h3>
<pre>{escape(json.dumps(result, indent=2) if isinstance(result, (dict, list)) else str(result))}</pre>
</section>
'''
return web.Response(text=html_page(f"Result: {tool_name}", body), content_type='text/html')
except TypeError as e:
return web.Response(text=error_html(f"Tool: {tool_name}", f"Invalid arguments: {e}"),
content_type='text/html', status=400)
except Exception as e:
traceback.print_exc()
return web.Response(text=error_html(f"Tool: {tool_name}", str(e)),
content_type='text/html', status=500)
async def handle_input_shaper_html(request: web.Request) -> web.Response:
"""Display input shaper settings for all toolheads."""
import re
try:
client = get_client()
# Query current active input_shaper settings from Klipper
result = await client.query_printer_objects({
"input_shaper": None # Get all attributes
})
active_shaper = {}
if "result" in result:
status = result.get("result", {}).get("status", {})
active_shaper = status.get("input_shaper", {})
# Read per-tool input shaper params from config files
tool_shapers = []
for i in range(config.TOOL_COUNT):
tool_name = config.TOOL_NAMES.get(i, f"T{i}")
filepath = f"Toolheads/T{i}.cfg"
try:
session = await client._get_session()
url = f"{client.base_url}/server/files/config/{filepath}"
async with session.get(url) as response:
if response.status == 200:
content = await response.text()
# Parse input shaper params from config
freq_x = re.search(r'params_input_shaper_freq_x:\s*([\d.]+)', content)
freq_y = re.search(r'params_input_shaper_freq_y:\s*([\d.]+)', content)
type_x = re.search(r'params_input_shaper_type_x:\s*(\w+)', content)
type_y = re.search(r'params_input_shaper_type_y:\s*(\w+)', content)
tool_shapers.append({
"tool": tool_name,
"freq_x": float(freq_x.group(1)) if freq_x else None,
"freq_y": float(freq_y.group(1)) if freq_y else None,
"type_x": type_x.group(1) if type_x else "mzv",
"type_y": type_y.group(1) if type_y else "mzv",
})
else:
tool_shapers.append({
"tool": tool_name,
"error": f"Config file not found: {filepath}"
})
except Exception as e:
tool_shapers.append({
"tool": tool_name,
"error": str(e)
})
# Build HTML
active_freq_x = active_shaper.get("shaper_freq_x", 0)
active_freq_y = active_shaper.get("shaper_freq_y", 0)
active_type_x = active_shaper.get("shaper_type_x", "unknown")
active_type_y = active_shaper.get("shaper_type_y", "unknown")
smoothing = active_shaper.get("smoothing", 0)
body = f'''
<section>
<h2>Active Input Shaper</h2>
<p>Currently applied shaper settings (changes on tool change):</p>
<table>
<tr><th>Axis</th><th>Type</th><th>Frequency (Hz)</th></tr>
<tr>
<td><strong>X</strong></td>
<td><code>{escape(active_type_x)}</code></td>
<td>{active_freq_x:.1f} Hz</td>
</tr>
<tr>
<td><strong>Y</strong></td>
<td><code>{escape(active_type_y)}</code></td>
<td>{active_freq_y:.1f} Hz</td>
</tr>
</table>
<dl>
<dt>Smoothing</dt><dd>{smoothing:.4f}</dd>
</dl>
</section>
<section>
<h2>Per-Tool Shaper Configuration</h2>
<p>Values stored in each toolhead's config file (applied on tool change):</p>
<table>
<tr><th>Tool</th><th>Freq X (Hz)</th><th>Freq Y (Hz)</th><th>Type X</th><th>Type Y</th><th>Config</th></tr>
'''
for ts in tool_shapers:
if "error" in ts:
body += f'''
<tr>
<td><strong>{escape(ts["tool"])}</strong></td>
<td colspan="4" style="color:#d32f2f;">Error: {escape(ts["error"])}</td>
<td>-</td>
</tr>
'''
else:
freq_x_str = f'{ts["freq_x"]:.1f}' if ts["freq_x"] else '<em>not set</em>'
freq_y_str = f'{ts["freq_y"]:.1f}' if ts["freq_y"] else '<em>not set</em>'
config_link = f'<a href="/config/Toolheads/{ts["tool"]}.cfg.html">View</a>'
body += f'''
<tr>
<td><strong>{escape(ts["tool"])}</strong></td>
<td>{freq_x_str}</td>
<td>{freq_y_str}</td>
<td><code>{escape(ts["type_x"])}</code></td>
<td><code>{escape(ts["type_y"])}</code></td>
<td>{config_link}</td>
</tr>
'''
body += '''
</table>
</section>
<section>
<h2>Calibration</h2>
<p>To calibrate input shaper for each tool:</p>
<ol>
<li>Select tool: <code>T0</code> / <code>T1</code> / <code>T2</code></li>
<li>Run: <code>SHAPER_CALIBRATE</code></li>
<li>Update the tool's config file with the recommended values</li>
<li>Repeat for each tool</li>
</ol>
<p><a href="/call/run_gcode.html?script=GET_INPUT_SHAPER">Check current shaper via G-code →</a></p>
</section>
'''
return web.Response(text=html_page("Input Shaper Status", body, f"{config.PRINTER_NAME}"),
content_type='text/html')
except Exception as e:
traceback.print_exc()
return web.Response(text=error_html("Input Shaper Status", str(e)),
content_type='text/html', status=500)
async def handle_config_files_html(request: web.Request) -> web.Response:
"""List and link to config files."""
try:
client = get_client()
result = await client.get_directory("config")
if "error" in result:
return web.Response(text=error_html("Config Files", result["error"]),
content_type='text/html', status=500)
data = result.get("result", {})
files = data.get("files", [])
cfg_files = [f for f in files if f.get("filename", "").endswith(".cfg")]
rows = ""
for f in sorted(cfg_files, key=lambda x: x.get("filename", "")):
filename = f.get("filename", "")
size = f.get("size", 0)
modified = f.get("modified", 0)
mod_date = datetime.fromtimestamp(modified).strftime("%Y-%m-%d %H:%M") if modified else "Unknown"
rows += f'''
<tr>
<td><a href="/config/{escape(filename)}.html">{escape(filename)}</a></td>
<td>{size:,} bytes</td>
<td>{mod_date}</td>
</tr>
'''
body = f'''
<section>
<h2>Configuration Files ({len(cfg_files)})</h2>
<table>
<tr><th>File</th><th>Size</th><th>Modified</th></tr>
{rows}
</table>
</section>
'''
return web.Response(text=html_page("Configuration Files", body), content_type='text/html')
except Exception as e:
return web.Response(text=error_html("Config Files", str(e)),
content_type='text/html', status=500)
async def handle_config_file_html(request: web.Request) -> web.Response:
"""Display a config file."""
filepath = request.match_info.get('filepath', '').replace('.html', '')
if not filepath:
return web.Response(text=error_html("Config File", "No file specified"),
content_type='text/html', status=400)
try:
client = get_client()
# Use direct file download like the filesystem tool does
session = await client._get_session()
url = f"{client.base_url}/server/files/config/{filepath}"
async with session.get(url) as response:
if response.status == 404:
return web.Response(text=error_html("Config File", f"File not found: {filepath}"),
content_type='text/html', status=404)
response.raise_for_status()
content = await response.text()
body = f'''
<section>
<h2>{escape(filepath)}</h2>
<p><a href="/config.html">← Back to file list</a></p>
<pre style="white-space:pre-wrap; word-wrap:break-word;">{escape(content)}</pre>
</section>
'''
return web.Response(text=html_page(f"Config: {filepath}", body), content_type='text/html')
except Exception as e:
return web.Response(text=error_html("Config File", str(e)),
content_type='text/html', status=500)
# ============================================================
# MCP Protocol Handler (JSON-RPC style)
# ============================================================
async def handle_mcp(request: web.Request) -> web.Response:
"""
Handle MCP protocol requests (JSON-RPC style).
This allows VS Code MCP clients to communicate with the server.
"""
try:
data = await request.json()
except json.JSONDecodeError:
return web.json_response({
"jsonrpc": "2.0",
"error": {"code": -32700, "message": "Parse error"},
"id": None
}, status=400)
method = data.get("method", "")
params = data.get("params", {})
request_id = data.get("id")
result = None
error = None
try:
if method == "initialize":
result = {
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": "klipper-mcp",
"version": "1.0.0"
},
"capabilities": {
"tools": {"listChanged": False}
}
}
elif method == "tools/list":
tools_list = []
type_map = {str: "string", int: "integer", float: "number", bool: "boolean"}
for name, tool_info in TOOLS.items():
func = tool_info["function"]
sig = inspect.signature(func)
try:
hints = get_type_hints(func)
except Exception:
hints = {}
properties = {}
required = []
for param_name, param in sig.parameters.items():
json_type = type_map.get(hints.get(param_name), "string")
properties[param_name] = {"type": json_type}
if param.default is inspect.Parameter.empty:
required.append(param_name)
input_schema = {"type": "object", "properties": properties}
if required:
input_schema["required"] = required
tools_list.append({
"name": name,
"description": get_tool_description(tool_info),
"inputSchema": input_schema
})
result = {"tools": tools_list}
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in TOOLS:
error = {"code": -32601, "message": f"Unknown tool: {tool_name}"}
else:
tool_info = TOOLS[tool_name]
func = tool_info["function"]
audit_log("tool_call", {"tool": tool_name, "arguments": arguments})
if asyncio.iscoroutinefunction(func):
tool_result = await func(**arguments)
else:
tool_result = func(**arguments)
# Parse JSON string result if needed
if isinstance(tool_result, str):
try:
tool_result = json.loads(tool_result)
except:
pass
result = {
"content": [{
"type": "text",
"text": json.dumps(tool_result, indent=2) if isinstance(tool_result, (dict, list)) else str(tool_result)
}]
}
elif method == "ping":
result = {}
else:
error = {"code": -32601, "message": f"Method not found: {method}"}
except Exception as e:
traceback.print_exc()
error = {"code": -32603, "message": str(e)}
response = {"jsonrpc": "2.0", "id": request_id}
if error:
response["error"] = error
else:
response["result"] = result
return web.json_response(response)
# ============================================================
# Main Application
# ============================================================
async def on_startup(app):
"""Called when the server starts."""
print("Initializing Moonraker client...", file=sys.stderr)
init_client()
print("Registering tools...", file=sys.stderr)
register_all_tools()
audit_log("server_start", {
"printer": config.PRINTER_NAME,
"moonraker_url": config.MOONRAKER_URL
})
async def on_cleanup(app):
"""Called when the server stops."""
await close_client()
audit_log("server_stop")
print("Server stopped.", file=sys.stderr)
def create_app() -> web.Application:
"""Create the aiohttp application."""
app = web.Application()
# JSON API routes
app.router.add_get("/", handle_server_info)
app.router.add_get("/health", handle_health)
app.router.add_get("/status", handle_printer_status)
app.router.add_get("/tools", handle_list_tools)
app.router.add_post("/tools/call", handle_call_tool)
app.router.add_post("/mcp", handle_mcp) # MCP protocol endpoint
# HTML routes (for fetch_webpage/browser compatibility)
app.router.add_get("/index.html", handle_server_info_html)
app.router.add_get("/status.html", handle_printer_status_html)
app.router.add_get("/tools.html", handle_list_tools_html)
app.router.add_get("/health.html", handle_health_html)
app.router.add_get("/shaper.html", handle_input_shaper_html)
app.router.add_get("/config.html", handle_config_files_html)
app.router.add_get("/config/{filepath:.+}.html", handle_config_file_html)
app.router.add_get("/call/{tool_name}.html", handle_call_tool_html)
# Lifecycle hooks
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
return app
def main():
"""Main entry point."""
print("=" * 50, file=sys.stderr)
print("Klipper MCP Server v1.0.0", file=sys.stderr)
print(f"Printer: {config.PRINTER_NAME}", file=sys.stderr)
print(f"Moonraker: {config.MOONRAKER_URL}", file=sys.stderr)
print(f"ARMED: {config.ARMED}", file=sys.stderr)
print("=" * 50, file=sys.stderr)
app = create_app()
print(f"Starting server on {config.MCP_HOST}:{config.MCP_PORT}", file=sys.stderr)
print(f"JSON API: http://{config.MCP_HOST}:{config.MCP_PORT}/", file=sys.stderr)
print(f"HTML UI: http://{config.MCP_HOST}:{config.MCP_PORT}/index.html", file=sys.stderr)
print(f"MCP: http://{config.MCP_HOST}:{config.MCP_PORT}/mcp", file=sys.stderr)
web.run_app(
app,
host=config.MCP_HOST,
port=config.MCP_PORT,
print=lambda x: print(x, file=sys.stderr)
)
if __name__ == "__main__":
main()