connect_g1_device
Establish a BLE connection to a G1 device using its Bluetooth MAC address, enabling communication via the Nordic UART protocol. Returns connection status, device name, and error details if applicable.
Instructions
Connect to a G1 device by address.
Args:
address (str): The Bluetooth MAC address of the G1 device to connect to.
Format should be XX:XX:XX:XX:XX:XX where X are hexadecimal characters.
Example: "AA:BB:CC:DD:EE:FF"
Returns:
Dict[str, Any]: JSON response with connection status including:
- result: "success" or "error"
- connected: Boolean indicating connection state
- device_name: Name of connected device (if successful)
- device_address: Address of connected device (if successful)
- error: Error message if connection failed
Note:
This establishes a BLE connection to the specified device and discovers
the Nordic UART service and characteristics.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| address | Yes |
Input Schema (JSON Schema)
{
"properties": {
"address": {
"title": "Address",
"type": "string"
}
},
"required": [
"address"
],
"title": "connect_g1_deviceArguments",
"type": "object"
}
Implementation Reference
- mcp_server.py:117-185 (handler)Primary handler for the 'connect_g1_device' tool. Decorated with @server.tool() for automatic registration in FastMCP. Includes input validation for address format (MAC or UUID), connection state checks, error handling, and delegates BLE connection to the global NordicBLEUARTManager instance.@server.tool() async def connect_g1_device(address: str) -> Dict[str, Any]: """Connect to a G1 device by address. Args: address (str): The Bluetooth MAC address of the G1 device to connect to. Format should be XX:XX:XX:XX:XX:XX where X are hexadecimal characters. Example: "AA:BB:CC:DD:EE:FF" Returns: Dict[str, Any]: JSON response with connection status including: - result: "success" or "error" - connected: Boolean indicating connection state - device_name: Name of connected device (if successful) - device_address: Address of connected device (if successful) - error: Error message if connection failed Note: This establishes a BLE connection to the specified device and discovers the Nordic UART service and characteristics. """ # Validate address format - accept both MAC addresses and CoreBluetooth UUIDs # Check if it's a MAC address (XX:XX:XX:XX:XX:XX) or CoreBluetooth UUID (XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX) mac_pattern = r'^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$' uuid_pattern = r'^[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}$' if not address or (not re.match(mac_pattern, address) and not re.match(uuid_pattern, address)): return { "result": "error", "connected": False, "error": "Invalid address format. Expected format:\n- MAC address: XX:XX:XX:XX:XX:XX\n- CoreBluetooth UUID: XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" } # Check if already connected if ble_manager.is_connected: return { "result": "error", "connected": True, "device_name": ble_manager.target_device.name if ble_manager.target_device else "Unknown", "device_address": address, "error": "Already connected to a device. Disconnect first." } try: success = await ble_manager.connect_to_device(address) except Exception as e: logger.error(f"Connection failed: {e}") return { "result": "error", "connected": False, "error": f"Connection failed: {str(e)}" } if success: device_name = ble_manager.target_device.name if ble_manager.target_device else "Unknown" return { "result": "success", "connected": True, "device_name": device_name, "device_address": address } else: return { "result": "error", "connected": False, "error": f"Failed to connect to device {address}" }
- mcp_server.py:119-137 (schema)Docstring providing input schema (address: str, MAC or UUID format) and output schema (Dict[str, Any] with fields: result, connected, device_name, device_address, error). Used by MCP/FastMCP for tool schema generation."""Connect to a G1 device by address. Args: address (str): The Bluetooth MAC address of the G1 device to connect to. Format should be XX:XX:XX:XX:XX:XX where X are hexadecimal characters. Example: "AA:BB:CC:DD:EE:FF" Returns: Dict[str, Any]: JSON response with connection status including: - result: "success" or "error" - connected: Boolean indicating connection state - device_name: Name of connected device (if successful) - device_address: Address of connected device (if successful) - error: Error message if connection failed Note: This establishes a BLE connection to the specified device and discovers the Nordic UART service and characteristics. """
- mcp_server.py:117-117 (registration)@server.tool() decorator registers the connect_g1_device function as an MCP tool with the FastMCP server instance.@server.tool()
- g1_uart_manager.py:101-164 (helper)Core helper method in NordicBLEUARTManager class that implements the BLE connection logic: scans if necessary, connects via BleakClient with timeout, discovers Nordic UART service/characteristics, starts notification handler and connection monitoring.async def connect_to_device(self, address: str) -> bool: """Connect to a BLE device by address and discover UART service""" logger.info(f"Connecting to device {address}...") target_device = self._find_device_by_address(address) if not target_device: try: await self.scan_for_devices() target_device = self._find_device_by_address(address) except Exception as e: logger.error(f"Scan failed: {e}") return False if not target_device: logger.error(f"Device {address} not found") return False logger.info(f"Found target device: {target_device.name} ({target_device.address})") # Connect to the device - this is where exceptions could occur try: self.client = BleakClient(target_device) logger.info("Attempting to connect...") logger.info("Note: On macOS, you may see a pairing prompt. Please accept it if it appears.") # Use a longer timeout to account for macOS pairing prompts await asyncio.wait_for(self.client.connect(), timeout=30.0) # 30 second timeout logger.info(f"Connected to {target_device.name}") # Wait for services to be discovered logger.info("Waiting for services to be discovered...") await asyncio.sleep(1) # Discover the UART service and characteristics logger.info("Discovering UART service...") if not await self._discover_uart_service(): logger.error("Failed to discover UART service") await self.disconnect() return False self.target_device = target_device self.is_connected = True self.connection_start_time = datetime.now() self.last_activity_time = datetime.now() self.last_heartbeat = datetime.now() self.reconnect_attempts = 0 # Start connection monitoring await self._start_connection_monitoring() logger.info(f"Successfully connected to {target_device.name} with UART service") return True except Exception as e: logger.error(f"Failed to connect: {e}") logger.error(f"Connection error traceback: {traceback.format_exc()}") logger.error("This might be due to:") logger.error("1. macOS pairing prompt not being accepted") logger.error("2. Device being out of range") logger.error("3. Device being busy or in use by another app") self.is_connected = False return False