Skip to main content
Glama

G1 UART MCP Server

by danroblewis
g1_uart_manager.py19.8 kB
#!/usr/bin/env python3 """ BLE UART Manager - Enhanced with Connection Monitoring and Auto-Reconnection """ import asyncio import logging import traceback from datetime import datetime from typing import Dict, List, Any import bleak from bleak import BleakScanner, BleakClient from bleak.backends.device import BLEDevice logger = logging.getLogger(__name__) class NordicBLEUARTManager: """Enhanced BLE UART Manager with connection monitoring and auto-reconnection""" # Nordic BLE UART Service UUIDs UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" UART_TX_CHARACTERISTIC_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" # Write UART_RX_CHARACTERISTIC_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" # Notify def __init__(self): self.is_connected = False self.target_device = None self.scanned_devices: List[BLEDevice] = [] self.client = None self.uart_service = None self.tx_characteristic = None self.rx_characteristic = None self.communication_log = [] self.message_id_counter = 0 self.pending_messages = {} # Connection monitoring self.connection_monitor_task = None self.last_heartbeat = None self.heartbeat_interval = 5.0 # 5 seconds self.connection_timeout = 30.0 # 30 seconds self.auto_reconnect_enabled = True self.reconnect_attempts = 0 self.max_reconnect_attempts = 3 self.reconnect_delay = 2.0 # 2 seconds # Connection state self.connection_start_time = None self.last_activity_time = None async def scan_for_devices(self, filter_pattern: str = "G1_") -> List[Dict[str, Any]]: """Scan for BLE devices, optionally filtering by name pattern""" logger.info("Starting BLE device scan...") try: # Perform BLE scan - this is the only operation that could throw an exception devices = await BleakScanner.discover(timeout=10.0) except Exception as e: logger.error(f"Scan failed: {e}") return [] # Process scan results - this won't throw exceptions device_list = [] self.scanned_devices = [] # Clear previous scan results for device in devices: if not device.name: continue # Filter by pattern if specified if filter_pattern and filter_pattern not in device.name: continue # Store the actual BLEDevice object for reuse self.scanned_devices.append(device) device_info = { "name": device.name, "address": device.address, "rssi": getattr(device, 'rssi', 'N/A'), "metadata": getattr(device, 'metadata', {}) } device_list.append(device_info) logger.info(f"Found device: {device.name} ({device.address})") logger.info(f"Scan complete. Found {len(device_list)} devices") return device_list def _find_device_by_address(self, address: str) -> BLEDevice: """Find a device by address in the scanned devices list""" return next((d for d in self.scanned_devices if d.address == address), None) def _find_characteristic(self, service, uuid: str, required_property: str) -> Any: """Find a characteristic by UUID with required property""" for char in service.characteristics: if char.uuid.lower() == uuid.lower() and required_property in char.properties: return char return None async def connect_to_device(self, address: str) -> bool: """Connect to a BLE device by address and discover UART service""" logger.info(f"Connecting to device {address}...") target_device = self._find_device_by_address(address) if not target_device: try: await self.scan_for_devices() target_device = self._find_device_by_address(address) except Exception as e: logger.error(f"Scan failed: {e}") return False if not target_device: logger.error(f"Device {address} not found") return False logger.info(f"Found target device: {target_device.name} ({target_device.address})") # Connect to the device - this is where exceptions could occur try: self.client = BleakClient(target_device) logger.info("Attempting to connect...") logger.info("Note: On macOS, you may see a pairing prompt. Please accept it if it appears.") # Use a longer timeout to account for macOS pairing prompts await asyncio.wait_for(self.client.connect(), timeout=30.0) # 30 second timeout logger.info(f"Connected to {target_device.name}") # Wait for services to be discovered logger.info("Waiting for services to be discovered...") await asyncio.sleep(1) # Discover the UART service and characteristics logger.info("Discovering UART service...") if not await self._discover_uart_service(): logger.error("Failed to discover UART service") await self.disconnect() return False self.target_device = target_device self.is_connected = True self.connection_start_time = datetime.now() self.last_activity_time = datetime.now() self.last_heartbeat = datetime.now() self.reconnect_attempts = 0 # Start connection monitoring await self._start_connection_monitoring() logger.info(f"Successfully connected to {target_device.name} with UART service") return True except Exception as e: logger.error(f"Failed to connect: {e}") logger.error(f"Connection error traceback: {traceback.format_exc()}") logger.error("This might be due to:") logger.error("1. macOS pairing prompt not being accepted") logger.error("2. Device being out of range") logger.error("3. Device being busy or in use by another app") self.is_connected = False return False async def _start_connection_monitoring(self): """Start background task to monitor connection health""" if self.connection_monitor_task: self.connection_monitor_task.cancel() self.connection_monitor_task = asyncio.create_task(self._connection_monitor_loop()) logger.info("Started connection monitoring") async def _connection_monitor_loop(self): """Background loop to monitor connection health and auto-reconnect if needed""" try: while self.is_connected and self.client: try: # Check if client is still connected if not self.client.is_connected: logger.warning("BLE client disconnected, attempting reconnection...") await self._handle_disconnection() break # Send heartbeat if enough time has passed if (datetime.now() - self.last_heartbeat).total_seconds() >= self.heartbeat_interval: await self._send_heartbeat() self.last_heartbeat = datetime.now() # Check for connection timeout if self.connection_start_time and (datetime.now() - self.connection_start_time).total_seconds() > self.connection_timeout: logger.warning("Connection timeout reached, checking health...") if not await self._check_connection_health(): await self._handle_disconnection() break await asyncio.sleep(1.0) # Check every second except Exception as e: logger.error(f"Error in connection monitor: {e}") await asyncio.sleep(1.0) except asyncio.CancelledError: logger.info("Connection monitoring cancelled") except Exception as e: logger.error(f"Connection monitoring failed: {e}") async def _send_heartbeat(self): """Send a heartbeat message to keep connection alive""" try: if self.is_connected and self.tx_characteristic: # Send a simple ping command (0x00) to keep connection alive heartbeat_data = bytes([0x00]) await self.client.write_gatt_char(self.tx_characteristic.uuid, heartbeat_data) logger.debug("Heartbeat sent") self.last_activity_time = datetime.now() except Exception as e: logger.warning(f"Heartbeat failed: {e}") # Heartbeat failure might indicate connection issues async def _check_connection_health(self) -> bool: """Check if the connection is still healthy""" try: if not self.client or not self.client.is_connected: return False # Try to read a characteristic property to test connection if self.tx_characteristic: # Just check if characteristic is accessible _ = self.tx_characteristic.uuid return True return False except Exception as e: logger.warning(f"Connection health check failed: {e}") return False async def _handle_disconnection(self): """Handle unexpected disconnection""" logger.warning("Handling unexpected disconnection...") if self.auto_reconnect_enabled and self.reconnect_attempts < self.max_reconnect_attempts: self.reconnect_attempts += 1 logger.info(f"Attempting auto-reconnection {self.reconnect_attempts}/{self.max_reconnect_attempts}") try: # Wait before reconnecting await asyncio.sleep(self.reconnect_delay) # Attempt to reconnect if self.target_device: success = await self.connect_to_device(self.target_device.address) if success: logger.info("Auto-reconnection successful!") return except Exception as e: logger.error(f"Auto-reconnection attempt {self.reconnect_attempts} failed: {e}") # If we get here, reconnection failed or is disabled logger.error("Auto-reconnection failed or disabled, cleaning up connection") await self.disconnect() async def _discover_uart_service(self) -> bool: """Discover the Nordic UART service and characteristics""" if not self.client or not self.client.services: logger.error("No services available") return False # Look for the Nordic UART service uart_service = None for service in self.client.services: if service.uuid.lower() == self.UART_SERVICE_UUID.lower(): uart_service = service break if not uart_service: logger.error("Nordic UART service not found") return False self.uart_service = uart_service logger.info("Found Nordic UART service") # Find TX characteristic (write) self.tx_characteristic = self._find_characteristic(uart_service, self.UART_TX_CHARACTERISTIC_UUID, "write") if not self.tx_characteristic: logger.error("UART TX characteristic not found or not writable") return False logger.info("Found UART TX characteristic (write)") # Find RX characteristic (notify) self.rx_characteristic = self._find_characteristic(uart_service, self.UART_RX_CHARACTERISTIC_UUID, "notify") if not self.rx_characteristic: logger.error("UART RX characteristic not found or not notifiable") return False logger.info("Found UART RX characteristic (notify)") # Start listening for notifications await self.client.start_notify(self.rx_characteristic.uuid, self._notification_handler) logger.info("Started listening to UART RX notifications") return True async def _notification_handler(self, characteristic, data: bytearray): """Handle incoming notifications from the UART RX characteristic""" timestamp = datetime.now() hex_data = data.hex() received_command_code = hex_data[:2].upper() if len(hex_data) >= 2 else "" logger.info(f"Received UART notification: {hex_data}") self.last_activity_time = timestamp # Look for a matching sent message matching_sent = None for msg_id, sent_msg in self.pending_messages.items(): if sent_msg["command_code"] == received_command_code: matching_sent = sent_msg break # If no matching message found, log and return early if not matching_sent: logger.info(f"No matching sent message found for response {hex_data}") return # This is a response to a sent message response_time_ms = int((timestamp - matching_sent["timestamp"]).total_seconds() * 1000) logger.info(f"Found matching sent message: {matching_sent['id']}") # Update the sent message with response info matching_sent["response_data"] = hex_data matching_sent["response_time_ms"] = response_time_ms # Signal that response has been received if "response_event" in matching_sent: matching_sent["response_event"].set() # Remove from pending messages if matching_sent["id"] in self.pending_messages: del self.pending_messages[sent_msg["id"]] logger.info(f"Matched response {hex_data} with sent message {matching_sent['id']} in {response_time_ms}ms") async def disconnect(self): """Disconnect from the current device""" # Stop connection monitoring if self.connection_monitor_task: self.connection_monitor_task.cancel() self.connection_monitor_task = None if self.client and self.is_connected: try: await self.client.disconnect() logger.info("Disconnected from device") except Exception as e: logger.error(f"Error during disconnect: {e}") # Always clean up state self.is_connected = False self.client = None self.uart_service = None self.tx_characteristic = None self.rx_characteristic = None self.target_device = None self.connection_start_time = None self.last_activity_time = None self.last_heartbeat = None self.reconnect_attempts = 0 def get_connection_status(self) -> Dict[str, Any]: """Get current connection status and device info""" connection_duration = None if self.connection_start_time: connection_duration = (datetime.now() - self.connection_start_time).total_seconds() last_activity = None if self.last_activity_time: last_activity = (datetime.now() - self.last_activity_time).total_seconds() return { "connected": self.is_connected, "device_name": self.target_device.name if self.target_device else None, "device_address": self.target_device.address if self.target_device else None, "uart_service_available": self.uart_service is not None, "tx_characteristic_available": self.tx_characteristic is not None, "rx_characteristic_available": self.rx_characteristic is not None, "pending_messages_count": len(self.pending_messages), "total_messages": len(self.communication_log), "connection_duration_seconds": connection_duration, "last_activity_seconds": last_activity, "reconnect_attempts": self.reconnect_attempts, "auto_reconnect_enabled": self.auto_reconnect_enabled } async def send_message(self, hex_data: str) -> str: """Send a UART message and return the hex response packet or empty string""" if not self.client or not self.is_connected: raise Exception("Not connected to any device") if not self.tx_characteristic: raise Exception("UART TX characteristic not available") # Check connection health before sending if not await self._check_connection_health(): logger.warning("Connection health check failed, attempting reconnection...") await self._handle_disconnection() raise Exception("Connection lost, please reconnect") # Convert hex string to bytes data = bytes.fromhex(hex_data) command_code = hex_data[:2].upper() if len(hex_data) >= 2 else "" # Create sent message record sent_msg = { "id": f"sent_{self.message_id_counter}", "timestamp": datetime.now(), "direction": "sent", "data": data, "command_code": command_code } self.message_id_counter += 1 self.communication_log.append(sent_msg) # Add to pending messages self.pending_messages[sent_msg["id"]] = sent_msg response_event = asyncio.Event() sent_msg["response_event"] = response_event try: # Send the data await self.client.write_gatt_char(self.tx_characteristic.uuid, data) logger.info(f"Sent UART message: {hex_data} (Command: {command_code})") self.last_activity_time = datetime.now() # Wait for response with 2-second timeout (increased from 1 second) await asyncio.wait_for(response_event.wait(), timeout=2.0) # Response received - return the hex data response_data = sent_msg.get("response_data", "") return response_data except asyncio.TimeoutError: # Timeout reached if sent_msg["id"] in self.pending_messages: del self.pending_messages[sent_msg["id"]] logger.warning(f"Message timeout for command {command_code}") return "" # Return empty string on timeout except Exception as e: logger.error(f"Failed to send message: {e}") # Clean up pending message on error if sent_msg["id"] in self.pending_messages: del self.pending_messages[sent_msg["id"]] # Check if this is a connection error if "disconnected" in str(e).lower() or "not connected" in str(e).lower(): logger.error("Connection error detected, attempting reconnection...") await self._handle_disconnection() raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danroblewis/g1_uart_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server