send_g1_message
Send hexadecimal messages to G1 Bluetooth devices via the Nordic UART protocol, await responses, and receive JSON status updates including sent status, response data, and errors.
Instructions
Send a message to the connected G1 device.
Args:
hex_data (str): Hexadecimal string representation of the message to send.
Can contain spaces, tabs, or other whitespace which will be automatically removed.
Should contain only valid hexadecimal characters (0-9, A-F, a-f).
Examples: "2506", "25 06", "25 06 00 01", "25 06 00 01 04 02"
Returns:
Dict[str, Any]: JSON response with message status including:
- result: "success" or "error"
- message_sent: Boolean indicating if message was sent
- response_received: Boolean indicating if response was received
- response_data: Response data in hex format (if received)
- timeout: Boolean indicating if message timed out
- error: Error message if sending failed
Note:
This sends the hex_data as bytes to the connected G1 device using the
Nordic BLE UART protocol and waits for a response up to 2 seconds.
All messages are treated as commands and will timeout after 2 seconds if no response is received.
Spaces, tabs, and other whitespace in hex_data are automatically removed before processing.
Examples:
- send_g1_message("2506") -> Sends command 0x25 with data 0x06
- send_g1_message("25 06") -> Same as above (spaces removed)
- send_g1_message("25 06 00 01") -> Sends 0x25060001
- send_g1_message("ABCD 1234") -> Sends 0xABCD1234
- send_g1_message("1234567890ABCDEF") -> Sends longer message
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| hex_data | Yes |
Input Schema (JSON Schema)
{
"properties": {
"hex_data": {
"title": "Hex Data",
"type": "string"
}
},
"required": [
"hex_data"
],
"title": "send_g1_messageArguments",
"type": "object"
}
Implementation Reference
- mcp_server.py:273-388 (handler)MCP tool handler implementation for 'send_g1_message'. Handles input validation, auto-connection if needed, delegates to BLE manager for sending, processes response or timeout, returns structured status dictionary.@server.tool() async def send_g1_message(hex_data: str) -> Dict[str, Any]: """Send a message to the connected G1 device. Args: hex_data (str): Hexadecimal string representation of the message to send. Can contain spaces, tabs, or other whitespace which will be automatically removed. Should contain only valid hexadecimal characters (0-9, A-F, a-f). Examples: "2506", "25 06", "25 06 00 01", "25 06 00 01 04 02" Returns: Dict[str, Any]: JSON response with message status including: - result: "success" or "error" - message_sent: Boolean indicating if message was sent - response_received: Boolean indicating if response was received - response_data: Response data in hex format (if received) - timeout: Boolean indicating if message timed out - error: Error message if sending failed Note: This sends the hex_data as bytes to the connected G1 device using the Nordic BLE UART protocol and waits for a response up to 2 seconds. All messages are treated as commands and will timeout after 2 seconds if no response is received. Spaces, tabs, and other whitespace in hex_data are automatically removed before processing. Examples: - send_g1_message("2506") -> Sends command 0x25 with data 0x06 - send_g1_message("25 06") -> Same as above (spaces removed) - send_g1_message("25 06 00 01") -> Sends 0x25060001 - send_g1_message("ABCD 1234") -> Sends 0xABCD1234 - send_g1_message("1234567890ABCDEF") -> Sends longer message """ # Check if connected and has services, if not try to connect if not ble_manager.is_connected or not ble_manager.uart_service: logger.info("Not connected or missing services, attempting to connect...") try: success = await auto_connect_to_right_device() if not success: return { "result": "error", "message_sent": False, "error": "Failed to connect to G1 device. Please ensure device is available and try again." } except Exception as e: return { "result": "error", "message_sent": False, "error": f"Connection failed: {str(e)}" } # Validate hex data format and remove spaces/whitespace if not hex_data: return { "result": "error", "message_sent": False, "error": "Hex data cannot be empty" } # Remove all spaces, tabs, newlines, and other whitespace cleaned_hex = ''.join(hex_data.split()) # Validate that cleaned hex data contains only valid hexadecimal characters if not cleaned_hex or not all(c in '0123456789ABCDEFabcdef' for c in cleaned_hex): return { "result": "error", "message_sent": False, "error": "Invalid hex data format. Use only hexadecimal characters (0-9, A-F, a-f). Spaces and whitespace are automatically removed." } try: # Send message using cleaned hex data response_data = await ble_manager.send_message(cleaned_hex) except Exception as e: error_msg = str(e) logger.error(f"Failed to send message: {error_msg}") # Check if this is a connection-related error if "connection lost" in error_msg.lower() or "disconnected" in error_msg.lower(): return { "result": "error", "message_sent": False, "error": f"Connection lost while sending message. Please reconnect using connect_g1_device. Error: {error_msg}" } elif "timeout" in error_msg.lower(): return { "result": "error", "message_sent": False, "error": f"Message timeout. The device may be unresponsive. Error: {error_msg}" } else: return { "result": "error", "message_sent": False, "error": f"Failed to send message: {error_msg}" } # Format response based on what was returned if response_data: # Format response data as space-separated hex pairs hex_pairs = ' '.join([response_data[i:i+2] for i in range(0, len(response_data), 2)]) return { "result": "success", "message_sent": True, "response_received": True, "response_data": hex_pairs, "timeout": False } else: return { "result": "success", "message_sent": True, "response_received": False, "response_data": None, "timeout": True }
- g1_uart_manager.py:396-464 (helper)Core implementation of message sending over BLE UART. Converts hex to bytes, writes to TX characteristic, awaits matching response via notification handler using command code prefix, handles timeout and connection errors.async def send_message(self, hex_data: str) -> str: """Send a UART message and return the hex response packet or empty string""" if not self.client or not self.is_connected: raise Exception("Not connected to any device") if not self.tx_characteristic: raise Exception("UART TX characteristic not available") # Check connection health before sending if not await self._check_connection_health(): logger.warning("Connection health check failed, attempting reconnection...") await self._handle_disconnection() raise Exception("Connection lost, please reconnect") # Convert hex string to bytes data = bytes.fromhex(hex_data) command_code = hex_data[:2].upper() if len(hex_data) >= 2 else "" # Create sent message record sent_msg = { "id": f"sent_{self.message_id_counter}", "timestamp": datetime.now(), "direction": "sent", "data": data, "command_code": command_code } self.message_id_counter += 1 self.communication_log.append(sent_msg) # Add to pending messages self.pending_messages[sent_msg["id"]] = sent_msg response_event = asyncio.Event() sent_msg["response_event"] = response_event try: # Send the data await self.client.write_gatt_char(self.tx_characteristic.uuid, data) logger.info(f"Sent UART message: {hex_data} (Command: {command_code})") self.last_activity_time = datetime.now() # Wait for response with 2-second timeout (increased from 1 second) await asyncio.wait_for(response_event.wait(), timeout=2.0) # Response received - return the hex data response_data = sent_msg.get("response_data", "") return response_data except asyncio.TimeoutError: # Timeout reached if sent_msg["id"] in self.pending_messages: del self.pending_messages[sent_msg["id"]] logger.warning(f"Message timeout for command {command_code}") return "" # Return empty string on timeout except Exception as e: logger.error(f"Failed to send message: {e}") # Clean up pending message on error if sent_msg["id"] in self.pending_messages: del self.pending_messages[sent_msg["id"]] # Check if this is a connection error if "disconnected" in str(e).lower() or "not connected" in str(e).lower(): logger.error("Connection error detected, attempting reconnection...") await self._handle_disconnection() raise
- mcp_server.py:34-72 (helper)Helper function called by send_g1_message handler to automatically scan and connect to the first 'right' G1 device if not already connected.async def auto_connect_to_right_device(): """Automatically scan for and connect to the first right G1 device found""" try: logger.info("Auto-connecting to right G1 device...") # Scan for devices devices = await ble_manager.scan_for_devices(filter_pattern="G1_") if not devices: logger.warning("No G1 devices found during auto-connect") return False # Find the first device with "_R_" in the name (right device) right_device = None for device in devices: if "_R_" in device['name']: right_device = device break if not right_device: logger.warning("No right G1 device found during auto-connect") return False logger.info(f"Found right G1 device: {right_device['name']} ({right_device['address']})") # Attempt to connect success = await ble_manager.connect_to_device(right_device['address']) if success: logger.info(f"Auto-connect successful to {right_device['name']}") return True else: logger.warning(f"Auto-connect failed to {right_device['name']}") return False except Exception as e: logger.error(f"Auto-connect failed: {e}") return False
- mcp_server.py:273-273 (registration)FastMCP decorator that registers the send_g1_message function as a tool.@server.tool()