Skip to main content
Glama

XDS110 MCP Server

server.py•12.2 kB
"""Main XDS110 MCP Server implementation.""" import asyncio import json import logging from pathlib import Path from typing import Any, Dict, List, Optional, Sequence from mcp.server import Server, NotificationOptions from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import ( Resource, Tool, TextContent, ListResourcesResult, ListToolsResult, CallToolResult, ReadResourceResult, ) from .gdb_interface.gdb_client import GDBClient from .gdb_interface.openocd_manager import OpenOCDManager from .knowledge.motor_control import MotorControlKnowledge from .tools.variable_monitor import VariableMonitorTool from .tools.memory_tools import MemoryTool from .tools.analysis_tools import AnalysisTool from .utils.config import Config from .utils.logging import setup_logging class XDS110MCPServer: """Main MCP server for XDS110 debugging.""" def __init__(self, config_path: Path, debug: bool = False): """Initialize the XDS110 MCP server. Args: config_path: Path to configuration file debug: Enable debug logging """ self.config = Config.load(config_path) self.debug = debug self.logger = setup_logging(debug) # Core components self.openocd_manager = OpenOCDManager(self.config.openocd) self.gdb_client = GDBClient(self.config.gdb) self.knowledge = MotorControlKnowledge() # MCP Tools self.variable_monitor = VariableMonitorTool(self.gdb_client, self.knowledge) self.memory_tool = MemoryTool(self.gdb_client, self.knowledge) self.analysis_tool = AnalysisTool(self.gdb_client, self.knowledge) # MCP Server self.server = Server("xds110-mcp-server") self._setup_handlers() self.logger.info("XDS110 MCP Server initialized") def _setup_handlers(self): """Set up MCP server handlers.""" @self.server.list_resources() async def handle_list_resources() -> ListResourcesResult: """List available resources.""" resources = [ Resource( uri="motor://variables", name="Motor Control Variables", description="Available motor control variables for monitoring" ), Resource( uri="memory://layout", name="Memory Layout", description="Target memory layout and structure definitions" ), Resource( uri="knowledge://motor_control", name="Motor Control Knowledge", description="Domain knowledge for motor control debugging" ), ] return ListResourcesResult(resources=resources) @self.server.read_resource() async def handle_read_resource(uri: str) -> ReadResourceResult: """Read a specific resource.""" if uri == "motor://variables": variables = await self.knowledge.get_available_variables() content = json.dumps(variables, indent=2) elif uri == "memory://layout": layout = await self.knowledge.get_memory_layout() content = json.dumps(layout, indent=2) elif uri == "knowledge://motor_control": knowledge = await self.knowledge.get_motor_control_info() content = json.dumps(knowledge, indent=2) else: raise ValueError(f"Unknown resource: {uri}") return ReadResourceResult( contents=[TextContent(type="text", text=content)] ) @self.server.list_tools() async def handle_list_tools() -> ListToolsResult: """List available MCP tools.""" tools = [ Tool( name="read_variables", description="Read current values of motor control variables", inputSchema={ "type": "object", "properties": { "variables": { "type": "array", "items": {"type": "string"}, "description": "List of variable names to read" }, "format": { "type": "string", "enum": ["json", "table"], "default": "json", "description": "Output format" } }, "required": ["variables"] } ), Tool( name="monitor_variables", description="Start continuous monitoring with change detection", inputSchema={ "type": "object", "properties": { "variables": { "type": "array", "items": {"type": "string"}, "description": "Variables to monitor" }, "duration": { "type": "number", "default": 10.0, "description": "Monitoring duration in seconds" }, "threshold": { "type": "number", "default": 0.01, "description": "Change detection threshold" } }, "required": ["variables"] } ), Tool( name="write_memory", description="Write values to memory addresses", inputSchema={ "type": "object", "properties": { "address": { "type": "string", "description": "Memory address (hex format)" }, "value": { "type": ["number", "array"], "description": "Value(s) to write" }, "size": { "type": "integer", "default": 1, "description": "Size in bytes" } }, "required": ["address", "value"] } ), Tool( name="analyze_motor_state", description="Analyze current motor control state", inputSchema={ "type": "object", "properties": { "focus_area": { "type": "string", "enum": ["alignment", "current_control", "position", "faults"], "description": "Area to focus analysis on" } } } ), ] return ListToolsResult(tools=tools) @self.server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> CallToolResult: """Handle tool calls.""" try: if name == "read_variables": result = await self.variable_monitor.read_variables( variables=arguments["variables"], format=arguments.get("format", "json") ) elif name == "monitor_variables": result = await self.variable_monitor.monitor_variables( variables=arguments["variables"], duration=arguments.get("duration", 10.0), threshold=arguments.get("threshold", 0.01) ) elif name == "write_memory": result = await self.memory_tool.write_memory( address=arguments["address"], value=arguments["value"], size=arguments.get("size", 1) ) elif name == "analyze_motor_state": result = await self.analysis_tool.analyze_motor_state( focus_area=arguments.get("focus_area") ) else: raise ValueError(f"Unknown tool: {name}") return CallToolResult( content=[TextContent(type="text", text=str(result))] ) except Exception as e: self.logger.error(f"Tool call error ({name}): {e}") return CallToolResult( content=[TextContent(type="text", text=f"Error: {e}")], isError=True ) async def initialize_hardware(self) -> bool: """Initialize hardware connections.""" try: # Start OpenOCD self.logger.info("Starting OpenOCD...") if not await self.openocd_manager.start(): self.logger.error("Failed to start OpenOCD") return False # Connect GDB client self.logger.info("Connecting GDB client...") if not await self.gdb_client.connect(): self.logger.error("Failed to connect GDB client") return False # Validate target connection self.logger.info("Validating target connection...") if not await self.gdb_client.validate_connection(): self.logger.error("Target validation failed") return False self.logger.info("Hardware initialization complete") return True except Exception as e: self.logger.error(f"Hardware initialization error: {e}") return False async def cleanup(self): """Clean up resources.""" self.logger.info("Cleaning up XDS110 MCP Server...") try: if hasattr(self, 'gdb_client'): await self.gdb_client.disconnect() if hasattr(self, 'openocd_manager'): await self.openocd_manager.stop() except Exception as e: self.logger.error(f"Cleanup error: {e}") self.logger.info("Cleanup complete") async def run(self): """Run the MCP server.""" try: # Initialize hardware if not await self.initialize_hardware(): raise RuntimeError("Hardware initialization failed") # Run MCP server self.logger.info("Starting MCP server...") async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="xds110-mcp-server", server_version="0.1.0", capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={} ) ) ) except Exception as e: self.logger.error(f"Server run error: {e}") raise finally: await self.cleanup()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shanemmattner/XDS110_MCP_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server