Skip to main content
Glama

XDS110 MCP Server

test_mcp_server.py8.69 kB
#!/usr/bin/env python3 """Test MCP server functionality without hardware dependency.""" import asyncio import json import sys from pathlib import Path from unittest.mock import AsyncMock, Mock # Add project to path sys.path.insert(0, str(Path(__file__).parent)) from xds110_mcp_server.knowledge.motor_control import MotorControlKnowledge from xds110_mcp_server.tools.variable_monitor import VariableMonitorTool from xds110_mcp_server.tools.memory_tools import MemoryTool from xds110_mcp_server.tools.analysis_tools import AnalysisTool class MockGDBClient: """Mock GDB client for testing without hardware.""" def __init__(self): self.connected = True async def connect(self): return True async def disconnect(self): pass async def validate_connection(self): return True async def read_multiple_variables(self, variables): """Return mock motor control data.""" mock_data = { "motorVars_M1.motorState": 2, # CTRL_RUN state "motorVars_M1.absPosition_rad": 1.234, "motorVars_M1.angleFOC_rad": 0.785, "motorVars_M1.angleENC_rad": 1.230, "motorVars_M1.Idq_out_A.value[0]": 0.8, # D-axis current "motorVars_M1.Idq_out_A.value[1]": 1.2, # Q-axis current "motorVars_M1.IsRef_A": 1.5, "motorVars_M1.fluxCurrent_A": 0.8, "motorVars_M1.alignCurrent_A": 0.1, "motorVars_M1.enableSpeedCtrl": True, "motorVars_M1.reversePhases": False, "motorVars_M1.faultMtrNow.bit.needsCalibration": 0, "motorVars_M1.faultMtrNow.bit.obakeNeedsInit": 0, "motorVars_M1.faultMtrNow.all": 0, "debug_bypass.debug_enabled": True, "debug_bypass.bypass_alignment_called": False, "debug_bypass.bypass_electrical_angle": 0.0, "debug_bypass.cs_gpio_pin": 20 } # Return only requested variables return {var: mock_data.get(var) for var in variables if var in mock_data} async def read_memory(self, address, count, format): """Return mock memory data.""" if address == "0xd3c0" or address == hex(0x0000d3c0): # Mock debug_bypass structure data return [1, 0, 71, 15, 39, 0, 200, 0, 100, 0, 10, 0, 5, 0, 0, 0] return [0] * count async def write_memory(self, address, values, size): """Mock memory write.""" print(f"Mock write: {len(values)} values to {address} (size={size})") return True async def test_variable_reading(): """Test variable reading functionality.""" print("=== Testing Variable Reading ===") mock_gdb = MockGDBClient() knowledge = MotorControlKnowledge() monitor = VariableMonitorTool(mock_gdb, knowledge) # Test reading critical variables critical_vars = knowledge.get_critical_variables() print(f"Testing {len(critical_vars)} critical variables...") result = await monitor.read_variables(critical_vars[:5], format="json") data = json.loads(result) print(f"✓ Successfully read {data['successful_reads']}/{data['variables_read']} variables") for var_name, reading in data["readings"].items(): if "error" not in reading: print(f" {var_name}: {reading['value']} {reading['units']} - {reading['description']}") return True async def test_motor_analysis(): """Test motor state analysis.""" print("\n=== Testing Motor State Analysis ===") mock_gdb = MockGDBClient() knowledge = MotorControlKnowledge() analysis_tool = AnalysisTool(mock_gdb, knowledge) # Test general analysis result = await analysis_tool.analyze_motor_state() data = json.loads(result) print(f"✓ Analysis completed: {data['assessment']}") if "fault_patterns" in data and data["fault_patterns"]: print(f"✓ Fault patterns detected: {len(data['fault_patterns'])}") for pattern in data["fault_patterns"]: print(f" - {pattern['name']}: {pattern['severity']}") else: print("✓ No fault patterns detected - system appears healthy") if "recommendations" in data and data["recommendations"]: print(f"✓ Generated {len(data['recommendations'])} recommendations:") for rec in data["recommendations"]: print(f" • {rec}") return True async def test_memory_operations(): """Test memory read/write operations.""" print("\n=== Testing Memory Operations ===") mock_gdb = MockGDBClient() knowledge = MotorControlKnowledge() memory_tool = MemoryTool(mock_gdb, knowledge) # Test reading debug_bypass structure result = await memory_tool.read_memory("0xd3c0", count=16, format="hex") data = json.loads(result) if data["success"]: print(f"✓ Successfully read {data['count']} bytes from {data['address']}") if "interpretation" in data: print(f" Structure: {data['interpretation']['structure']}") print(f" Field: {data['interpretation'].get('field', 'Unknown')}") # Test safe memory write result = await memory_tool.write_debug_bypass_field("debug_enabled", 1) data = json.loads(result) if data["success"]: print(f"✓ Successfully wrote {data['field']} = {data['value']}") if data["verification"]["verified"]: print(" ✓ Write verified by readback") return True async def test_monitoring(): """Test variable monitoring with change detection.""" print("\n=== Testing Variable Monitoring ===") mock_gdb = MockGDBClient() knowledge = MotorControlKnowledge() monitor = VariableMonitorTool(mock_gdb, knowledge) # Test short monitoring session test_vars = [ "motorVars_M1.motorState", "motorVars_M1.Idq_out_A.value[0]", "motorVars_M1.absPosition_rad" ] print(f"Starting 3-second monitoring of {len(test_vars)} variables...") result = await monitor.monitor_variables(test_vars, duration=3.0, threshold=0.01) data = json.loads(result) summary = data["monitoring_summary"] print(f"✓ Monitoring completed:") print(f" Duration: {summary['duration_seconds']:.1f}s") print(f" Cycles: {summary['cycles_completed']}") print(f" Frequency: {summary['frequency_hz']:.1f}Hz") print(f" Variables with changes: {summary['variables_with_changes']}") return True async def test_knowledge_integration(): """Test knowledge database integration.""" print("\n=== Testing Knowledge Integration ===") knowledge = MotorControlKnowledge() # Test fault pattern analysis with specific scenario test_scenario = { "motorVars_M1.faultMtrNow.bit.needsCalibration": 1, "motorVars_M1.faultMtrNow.bit.obakeNeedsInit": 1, "motorVars_M1.Idq_out_A.value[0]": 0.0, "motorVars_M1.Idq_out_A.value[1]": 0.0, "motorVars_M1.motorState": 2 } patterns = knowledge.analyze_fault_patterns(test_scenario) print(f"✓ Detected {len(patterns)} fault patterns in test scenario:") for pattern in patterns: print(f" - {pattern.name} ({pattern.severity})") print(f" {pattern.description}") for rec in pattern.recommendations[:2]: # Show first 2 recommendations print(f" → {rec}") return True async def main(): """Run all MCP server tests.""" print("XDS110 MCP Server - Software Component Tests") print("=" * 60) print("Testing without hardware dependency using mock GDB client") print("=" * 60) try: # Run all tests await test_variable_reading() await test_motor_analysis() await test_memory_operations() await test_monitoring() await test_knowledge_integration() print("\n" + "=" * 60) print("🎉 All MCP server software tests passed!") print("✅ Variable reading and monitoring") print("✅ Motor state analysis with fault detection") print("✅ Memory operations with safety validation") print("✅ Knowledge database integration") print("✅ Real-time monitoring with change detection") print("\n🚀 MCP server software stack is fully functional!") print("📋 Next: Debug hardware connection for live testing") except Exception as e: print(f"\n❌ Test failed: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shanemmattner/XDS110_MCP_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server