Skip to main content
Glama

G1 UART MCP Server

by danroblewis
example_mcp_client.py11.8 kB
#!/usr/bin/env python3 """ Test the MCP server by creating a client and running actual commands """ import asyncio import os import sys import re import json from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client async def test_mcp_server(): """Test the MCP server by creating a client and running commands""" print("Starting MCP server test...") # Create server parameters for stdio connection server_params = StdioServerParameters( command=sys.executable, # Use the current Python interpreter args=["mcp_server.py"], # Our MCP server script cwd=os.getcwd(), # Current working directory ) try: print("✓ Creating MCP client...") # Create MCP client and connect to the server async with stdio_client(server_params) as (read_stream, write_stream): # Create client session async with ClientSession(read_stream, write_stream) as session: print("✓ Connected to MCP server") # Initialize the connection await session.initialize() print("✓ Session initialized") # Test 1: List available tools print("\n1. Testing list_tools...") tools = await session.list_tools() print(f"✓ Found {len(tools.tools)} tools:") for tool in tools.tools: print(f" - {tool.name}: {tool.description}") # Test 2: Call scan_g1_devices print("\n2. Testing scan_g1_devices...") scan_result = await session.call_tool("scan_g1_devices", {}) scan_text = scan_result.content[0].text print(f"✓ scan_g1_devices result: {scan_text}") # Parse JSON response to extract device information try: scan_data = json.loads(scan_text) print(f"✓ Parsed JSON response: {json.dumps(scan_data, indent=2)}") # Check if scan was successful if scan_data.get("result") == "success" and scan_data.get("devices"): devices = scan_data["devices"] print(f"✓ Found {len(devices)} devices") # Look specifically for the right device (with "_R_" in name) device_address = None device_name = None device_side = None for device in devices: if "_R_" in device.get("name", ""): device_address = device.get("id") device_name = device.get("name") device_side = device.get("side") print(f"✓ Found right device: {device_name} ({device_side}) - {device_address}") break if not device_address: print("✗ Error: No right device (with '_R_' in name) found in scan results") print("Available devices:") for device in devices: print(f" - {device['name']} ({device.get('side', 'unknown')}) - {device['id']}") return else: print(f"✗ Error: Scan failed - {scan_data.get('error', 'Unknown error')}") return except json.JSONDecodeError as e: print(f"✗ Error: Failed to parse JSON response: {e}") print(f"Raw response: {scan_text}") return # Test 3: Call connect_g1_device with found device print(f"\n3. Testing connect_g1_device with {device_address}...") connect_result = await session.call_tool("connect_g1_device", { "address": device_address }) connect_text = connect_result.content[0].text print(f"✓ connect_g1_device result: {connect_text}") # Parse connection response try: connect_data = json.loads(connect_text) print(f"✓ Parsed connection response: {json.dumps(connect_data, indent=2)}") if connect_data.get("result") != "success": print(f"✗ Connection failed: {connect_data.get('error', 'Unknown error')}") print("Test cannot continue without a successful connection") return except json.JSONDecodeError as e: print(f"✗ Error: Failed to parse connection JSON response: {e}") print(f"Raw response: {connect_text}") return # Test 4: Call get_g1_connection_status print("\n4. Testing get_g1_connection_status...") status_result = await session.call_tool("get_g1_connection_status", {}) status_text = status_result.content[0].text print(f"✓ get_g1_connection_status result: {status_text}") # Parse status response try: status_data = json.loads(status_text) print(f"✓ Parsed status response: {json.dumps(status_data, indent=2)}") if status_data.get("result") == "success" and status_data.get("connected"): print("✓ Device is connected and ready for communication") else: print("✗ Device is not connected") return except json.JSONDecodeError as e: print(f"✗ Error: Failed to parse status JSON response: {e}") print(f"Raw response: {status_text}") return # Test 5: Call send_g1_message (only if connected) if status_data.get("connected"): print("\n5. Testing send_g1_message with heartbeat command...") send_result = await session.call_tool("send_g1_message", { "hex_data": "250600010402" }) send_text = send_result.content[0].text print(f"✓ send_g1_message result: {send_text}") # Parse send response try: send_data = json.loads(send_text) print(f"✓ Parsed send response: {json.dumps(send_data, indent=2)}") if send_data.get("result") == "success": if send_data.get("response_received"): print(f"✓ Message sent and response received: {send_data.get('response_data')}") else: print("✓ Message sent but timed out (no response)") else: print(f"✗ Message failed: {send_data.get('error', 'Unknown error')}") except json.JSONDecodeError as e: print(f"✗ Error: Failed to parse send JSON response: {e}") print(f"Raw response: {send_text}") # Test 6: Call send_g1_message with get display mode command print("\n6. Testing send_g1_message with get display mode command...") send_result2 = await session.call_tool("send_g1_message", { "hex_data": "2b000101" }) send_text2 = send_result2.content[0].text print(f"✓ send_g1_message (get display mode) result: {send_text2}") # Parse send response try: send_data2 = json.loads(send_text2) if send_data2.get("result") == "success": if send_data2.get("response_received"): print(f"✓ Display mode response: {send_data2.get('response_data')}") else: print("✓ Display mode command sent but timed out") else: print(f"✗ Display mode command failed: {send_data2.get('error')}") except json.JSONDecodeError: print("✗ Failed to parse display mode response") # Test 7: Call send_g1_message with clear screen command print("\n7. Testing send_g1_message with clear screen command...") send_result3 = await session.call_tool("send_g1_message", { "hex_data": "18000100" }) send_text3 = send_result3.content[0].text print(f"✓ send_g1_message (clear screen) result: {send_text3}") # Parse send response try: send_data3 = json.loads(send_text3) if send_data3.get("result") == "success": if send_data3.get("response_received"): print(f"✓ Clear screen response: {send_data3.get('response_data')}") else: print("✓ Clear screen command sent but timed out") else: print(f"✗ Clear screen command failed: {send_data3.get('error')}") except json.JSONDecodeError: print("✗ Failed to parse clear screen response") else: print("\n5. Skipping send_g1_message tests (not connected)") print("6. Skipping send_g1_message tests (not connected)") print("7. Skipping send_g1_message tests (not connected)") # Test 8: Call disconnect_g1_device print("\n8. Testing disconnect_g1_device...") disconnect_result = await session.call_tool("disconnect_g1_device", {}) disconnect_text = disconnect_result.content[0].text print(f"✓ disconnect_g1_device result: {disconnect_text}") # Parse disconnect response try: disconnect_data = json.loads(disconnect_text) if disconnect_data.get("result") == "success": print(f"✓ Successfully disconnected from: {disconnect_data.get('device_name')}") else: print(f"✗ Disconnect failed: {disconnect_data.get('error', 'Unknown error')}") except json.JSONDecodeError: print("✗ Failed to parse disconnect response") print("\n✓ All MCP tool tests completed successfully!") except Exception as e: print(f"✗ Test failed: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(test_mcp_server())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danroblewis/g1_uart_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server