mcp_server.py•14.7 kB
#!/usr/bin/env python3
"""
MCP Server for G1 Device Operations using Nordic BLE UART Protocol
This server provides tools for:
- scan_g1_devices
- connect_g1_device
- disconnect_g1_device
- get_g1_connection_status
- send_g1_message
"""
import asyncio
import logging
from typing import Dict, List, Any
import re
from datetime import datetime, timedelta
from mcp.server.fastmcp import FastMCP
from mcp.types import TextContent
from g1_uart_manager import NordicBLEUARTManager
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create FastMCP server
server = FastMCP("g1-device-mcp")
# Global BLE UART manager instance
ble_manager = NordicBLEUARTManager()
async def auto_connect_to_right_device():
"""Automatically scan for and connect to the first right G1 device found"""
try:
logger.info("Auto-connecting to right G1 device...")
# Scan for devices
devices = await ble_manager.scan_for_devices(filter_pattern="G1_")
if not devices:
logger.warning("No G1 devices found during auto-connect")
return False
# Find the first device with "_R_" in the name (right device)
right_device = None
for device in devices:
if "_R_" in device['name']:
right_device = device
break
if not right_device:
logger.warning("No right G1 device found during auto-connect")
return False
logger.info(f"Found right G1 device: {right_device['name']} ({right_device['address']})")
# Attempt to connect
success = await ble_manager.connect_to_device(right_device['address'])
if success:
logger.info(f"Auto-connect successful to {right_device['name']}")
return True
else:
logger.warning(f"Auto-connect failed to {right_device['name']}")
return False
except Exception as e:
logger.error(f"Auto-connect failed: {e}")
return False
@server.tool()
async def scan_g1_devices() -> Dict[str, Any]:
"""Scan for available G1 devices.
Returns:
Dict[str, Any]: JSON response with scan results including:
- result: "success" or "error"
- devices: List of discovered devices with their properties
- count: Number of devices found
- error: Error message if scan failed
Note:
This performs an actual BLE scan for devices with names containing "G1_" pattern.
Returns a structured list of discovered devices with their addresses and signal strength.
"""
try:
devices = await ble_manager.scan_for_devices(filter_pattern="G1_")
except Exception as e:
logger.error(f"Scan failed: {e}")
return {
"result": "error",
"error": str(e),
"devices": [],
"count": 0
}
# Process devices to extract side information and format properly
processed_devices = []
for device in devices:
device_info = {
"name": device['name'],
"id": device['address'],
"side": "right" if "_R_" in device['name'] else "left" if "_L_" in device['name'] else "unknown",
"rssi": device.get('rssi') if device.get('rssi') != 'N/A' else None
}
processed_devices.append(device_info)
return {
"result": "success",
"devices": processed_devices,
"count": len(processed_devices)
}
@server.tool()
async def connect_g1_device(address: str) -> Dict[str, Any]:
"""Connect to a G1 device by address.
Args:
address (str): The Bluetooth MAC address of the G1 device to connect to.
Format should be XX:XX:XX:XX:XX:XX where X are hexadecimal characters.
Example: "AA:BB:CC:DD:EE:FF"
Returns:
Dict[str, Any]: JSON response with connection status including:
- result: "success" or "error"
- connected: Boolean indicating connection state
- device_name: Name of connected device (if successful)
- device_address: Address of connected device (if successful)
- error: Error message if connection failed
Note:
This establishes a BLE connection to the specified device and discovers
the Nordic UART service and characteristics.
"""
# Validate address format - accept both MAC addresses and CoreBluetooth UUIDs
# Check if it's a MAC address (XX:XX:XX:XX:XX:XX) or CoreBluetooth UUID (XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX)
mac_pattern = r'^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$'
uuid_pattern = r'^[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}$'
if not address or (not re.match(mac_pattern, address) and not re.match(uuid_pattern, address)):
return {
"result": "error",
"connected": False,
"error": "Invalid address format. Expected format:\n- MAC address: XX:XX:XX:XX:XX:XX\n- CoreBluetooth UUID: XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}
# Check if already connected
if ble_manager.is_connected:
return {
"result": "error",
"connected": True,
"device_name": ble_manager.target_device.name if ble_manager.target_device else "Unknown",
"device_address": address,
"error": "Already connected to a device. Disconnect first."
}
try:
success = await ble_manager.connect_to_device(address)
except Exception as e:
logger.error(f"Connection failed: {e}")
return {
"result": "error",
"connected": False,
"error": f"Connection failed: {str(e)}"
}
if success:
device_name = ble_manager.target_device.name if ble_manager.target_device else "Unknown"
return {
"result": "success",
"connected": True,
"device_name": device_name,
"device_address": address
}
else:
return {
"result": "error",
"connected": False,
"error": f"Failed to connect to device {address}"
}
@server.tool()
async def disconnect_g1_device() -> Dict[str, Any]:
"""Disconnect from the current G1 device.
Returns:
Dict[str, Any]: JSON response with disconnection status including:
- result: "success" or "error"
- disconnected: Boolean indicating disconnection state
- device_name: Name of previously connected device (if successful)
- error: Error message if disconnection failed
Note:
This closes the BLE connection to the currently connected device,
cleans up resources, stops the heartbeat mechanism, and resets connection state.
"""
if not ble_manager.is_connected:
return {
"result": "error",
"disconnected": False,
"error": "Not connected to any device"
}
device_name = ble_manager.target_device.name if ble_manager.target_device else "Unknown"
try:
await ble_manager.disconnect()
return {
"result": "success",
"disconnected": True,
"device_name": device_name
}
except Exception as e:
logger.error(f"Disconnection failed: {e}")
return {
"result": "error",
"disconnected": False,
"error": f"Disconnection failed: {str(e)}"
}
@server.tool()
async def get_g1_connection_status() -> Dict[str, Any]:
"""Get current connection status and device info.
Returns:
Dict[str, Any]: JSON response with detailed connection status including:
- result: "success" or "error"
- connected: Boolean indicating connection state
- device_name: Name of connected device (if connected)
- device_address: Address of connected device (if connected)
- uart_service_available: Boolean indicating UART service availability
- tx_characteristic_available: Boolean indicating TX characteristic availability
- rx_characteristic_available: Boolean indicating RX characteristic availability
- pending_messages_count: Number of pending messages
- total_messages: Total message count
- error: Error message if status check failed
Note:
This returns detailed status information including:
- Connection state (connected/disconnected)
- Device name and address (if connected)
- UART service availability
- Number of pending messages
- Total message count
"""
try:
status = ble_manager.get_connection_status()
except Exception as e:
logger.error(f"Status check failed: {e}")
return {
"result": "error",
"connected": False,
"error": f"Status check failed: {str(e)}"
}
return {
"result": "success",
"connected": status['connected'],
"device_name": status.get('device_name'),
"device_address": status.get('device_address'),
"uart_service_available": status.get('uart_service_available', False),
"tx_characteristic_available": status.get('tx_characteristic_available', False),
"rx_characteristic_available": status.get('rx_characteristic_available', False),
"pending_messages_count": status.get('pending_messages_count', 0),
"total_messages": status.get('total_messages', 0)
}
@server.tool()
async def send_g1_message(hex_data: str) -> Dict[str, Any]:
"""Send a message to the connected G1 device.
Args:
hex_data (str): Hexadecimal string representation of the message to send.
Can contain spaces, tabs, or other whitespace which will be automatically removed.
Should contain only valid hexadecimal characters (0-9, A-F, a-f).
Examples: "2506", "25 06", "25 06 00 01", "25 06 00 01 04 02"
Returns:
Dict[str, Any]: JSON response with message status including:
- result: "success" or "error"
- message_sent: Boolean indicating if message was sent
- response_received: Boolean indicating if response was received
- response_data: Response data in hex format (if received)
- timeout: Boolean indicating if message timed out
- error: Error message if sending failed
Note:
This sends the hex_data as bytes to the connected G1 device using the
Nordic BLE UART protocol and waits for a response up to 2 seconds.
All messages are treated as commands and will timeout after 2 seconds if no response is received.
Spaces, tabs, and other whitespace in hex_data are automatically removed before processing.
Examples:
- send_g1_message("2506") -> Sends command 0x25 with data 0x06
- send_g1_message("25 06") -> Same as above (spaces removed)
- send_g1_message("25 06 00 01") -> Sends 0x25060001
- send_g1_message("ABCD 1234") -> Sends 0xABCD1234
- send_g1_message("1234567890ABCDEF") -> Sends longer message
"""
# Check if connected and has services, if not try to connect
if not ble_manager.is_connected or not ble_manager.uart_service:
logger.info("Not connected or missing services, attempting to connect...")
try:
success = await auto_connect_to_right_device()
if not success:
return {
"result": "error",
"message_sent": False,
"error": "Failed to connect to G1 device. Please ensure device is available and try again."
}
except Exception as e:
return {
"result": "error",
"message_sent": False,
"error": f"Connection failed: {str(e)}"
}
# Validate hex data format and remove spaces/whitespace
if not hex_data:
return {
"result": "error",
"message_sent": False,
"error": "Hex data cannot be empty"
}
# Remove all spaces, tabs, newlines, and other whitespace
cleaned_hex = ''.join(hex_data.split())
# Validate that cleaned hex data contains only valid hexadecimal characters
if not cleaned_hex or not all(c in '0123456789ABCDEFabcdef' for c in cleaned_hex):
return {
"result": "error",
"message_sent": False,
"error": "Invalid hex data format. Use only hexadecimal characters (0-9, A-F, a-f). Spaces and whitespace are automatically removed."
}
try:
# Send message using cleaned hex data
response_data = await ble_manager.send_message(cleaned_hex)
except Exception as e:
error_msg = str(e)
logger.error(f"Failed to send message: {error_msg}")
# Check if this is a connection-related error
if "connection lost" in error_msg.lower() or "disconnected" in error_msg.lower():
return {
"result": "error",
"message_sent": False,
"error": f"Connection lost while sending message. Please reconnect using connect_g1_device. Error: {error_msg}"
}
elif "timeout" in error_msg.lower():
return {
"result": "error",
"message_sent": False,
"error": f"Message timeout. The device may be unresponsive. Error: {error_msg}"
}
else:
return {
"result": "error",
"message_sent": False,
"error": f"Failed to send message: {error_msg}"
}
# Format response based on what was returned
if response_data:
# Format response data as space-separated hex pairs
hex_pairs = ' '.join([response_data[i:i+2] for i in range(0, len(response_data), 2)])
return {
"result": "success",
"message_sent": True,
"response_received": True,
"response_data": hex_pairs,
"timeout": False
}
else:
return {
"result": "success",
"message_sent": True,
"response_received": False,
"response_data": None,
"timeout": True
}
def main():
server.run()
if __name__ == "__main__":
server.run()