Skip to main content
Glama
fastmcp_server.py7.7 kB
import time import types from typing import Any, Dict, List, Optional from mcp.server.fastmcp import Context, FastMCP from starlette.middleware.cors import CORSMiddleware from starlette.requests import Request from starlette.responses import JSONResponse from ..bus import make_bus, shutdown_bus from ..config import get_settings from ..dbc import decode_frame, load_dbc def create_app() -> FastMCP: """Create a FastMCP server exposing CAN tools and DBC metadata.""" settings = get_settings() mcp = FastMCP("Vehicle CAN MCP") db = load_dbc(settings.dbc_path) @mcp.tool() async def read_can_frames( duration_s: float = 1.0, ctx: Context | None = None, ) -> List[Dict[str, Any]]: bus = make_bus(settings.can_interface, settings.can_channel) try: frames: List[Dict[str, Any]] = [] end = time.time() + duration_s count = 0 while time.time() < end: msg = bus.recv(timeout=0.1) if msg: frames.append( { "timestamp": msg.timestamp, "arbitration_id": hex(msg.arbitration_id), "data": list(msg.data), } ) count += 1 if ctx: await ctx.report_progress(count) return frames finally: shutdown_bus(bus) @mcp.tool() def decode_can_frame(arbitration_id: int, data: List[int]) -> Dict[str, Any]: try: decoded = decode_frame(db, arbitration_id, bytes(data)) return {"status": "success", "signals": decoded} except Exception as e: return {"status": "error", "message": str(e)} @mcp.tool() async def filter_frames( arbitration_id: Optional[int] = None, signal_name: Optional[str] = None, duration_s: float = 1.0, ctx: Context | None = None, ) -> List[Dict[str, Any]]: bus = make_bus(settings.can_interface, settings.can_channel) try: end = time.time() + duration_s results: List[Dict[str, Any]] = [] count = 0 while time.time() < end: msg = bus.recv(timeout=0.1) if msg: if arbitration_id is not None and msg.arbitration_id != arbitration_id: continue frame_info: Dict[str, Any] = { "timestamp": msg.timestamp, "arbitration_id": hex(msg.arbitration_id), "data": list(msg.data), } if signal_name: try: decoded = decode_frame(db, msg.arbitration_id, msg.data) if signal_name in decoded: frame_info["signal_value"] = decoded[signal_name] results.append(frame_info) except Exception: pass else: results.append(frame_info) count += 1 if ctx: await ctx.report_progress(count) return results finally: shutdown_bus(bus) @mcp.tool() async def monitor_signal( signal_name: str, duration_s: float = 2.0, ctx: Context | None = None, ) -> List[Dict[str, Any]]: bus = make_bus(settings.can_interface, settings.can_channel) try: end = time.time() + duration_s results: List[Dict[str, Any]] = [] count = 0 while time.time() < end: msg = bus.recv(timeout=0.1) if msg: try: decoded = decode_frame(db, msg.arbitration_id, msg.data) if signal_name in decoded: results.append( { "timestamp": msg.timestamp, "value": decoded[signal_name], } ) count += 1 if ctx: await ctx.report_progress(count) except Exception: pass return results finally: shutdown_bus(bus) @mcp.resource("file://vehicle.dbc") def dbc_info() -> Dict[str, Any]: info: Dict[str, Any] = {} try: info['status'] = 'success' info['version'] = db.version if db.version else 'N/A' info['nodes'] = [node.name for node in db.nodes] messages_info = [] for msg in db.messages: message_details = { 'name': msg.name, 'id': msg.frame_id, 'id_hex': hex(msg.frame_id), 'length': msg.length, 'cycle_time_ms': msg.cycle_time, 'senders': msg.senders, 'signals': [] } for sig in msg.signals: signal_details = { 'name': sig.name, 'start_bit': sig.start, 'length_bits': sig.length, 'scale': sig.scale, 'offset': sig.offset, 'minimum': sig.minimum, 'maximum': sig.maximum, 'unit': sig.unit, 'choices': sig.choices, 'is_signed': sig.is_signed, 'is_float': sig.is_float, 'byte_order': sig.byte_order, 'receivers': sig.receivers, } message_details['signals'].append(signal_details) messages_info.append(message_details) info['messages'] = messages_info except FileNotFoundError: info['status'] = 'error' info['message'] = "DBC file not found" except Exception as e: info['status'] = 'error' info['message'] = f"An unexpected error occurred: {e}" return info # Health/compat endpoints for clients that probe OAuth discovery. @mcp.custom_route("/.well-known/oauth-authorization-server/sse", methods=["GET", "OPTIONS"]) async def _auth_discovery(_: Request) -> JSONResponse: return JSONResponse({"status": "ok", "auth": False}) @mcp.custom_route("/.well-known/oauth-protected-resource", methods=["GET", "OPTIONS"]) async def _protected_discovery(_: Request) -> JSONResponse: return JSONResponse({"status": "ok", "auth": False}) # Enable permissive CORS so browser-based MCP hosts (Inspector) can reach SSE. original_sse_app = mcp.sse_app def _cors_sse_app(self: FastMCP): app = original_sse_app() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], allow_credentials=True, ) return app mcp.sse_app = types.MethodType(_cors_sse_app, mcp) return mcp def main() -> None: settings = get_settings() mcp = create_app() mcp.settings.port = settings.mcp_port # Ensure accessible outside container/host mcp.settings.host = "0.0.0.0" mcp.run(transport="sse")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/farzadnadiri/mcp-ecu'

If you have feedback or need assistance with the MCP directory API, please join our Discord server