Skip to main content
Glama
danroblewis

G1 UART MCP Server

by danroblewis

send_g1_message

Send hexadecimal commands to G1 Bluetooth devices via the Nordic UART protocol, automatically removing whitespace and waiting up to 2 seconds for responses.

Instructions

Send a message to the connected G1 device.

Args:
    hex_data (str): Hexadecimal string representation of the message to send.
                   Can contain spaces, tabs, or other whitespace which will be automatically removed.
                   Should contain only valid hexadecimal characters (0-9, A-F, a-f).
                   Examples: "2506", "25 06", "25 06 00 01", "25 06 00 01 04 02"

Returns:
    Dict[str, Any]: JSON response with message status including:
        - result: "success" or "error"
        - message_sent: Boolean indicating if message was sent
        - response_received: Boolean indicating if response was received
        - response_data: Response data in hex format (if received)
        - timeout: Boolean indicating if message timed out
        - error: Error message if sending failed
    
Note:
    This sends the hex_data as bytes to the connected G1 device using the
    Nordic BLE UART protocol and waits for a response up to 2 seconds.
    All messages are treated as commands and will timeout after 2 seconds if no response is received.
    Spaces, tabs, and other whitespace in hex_data are automatically removed before processing.
    
Examples:
    - send_g1_message("2506") -> Sends command 0x25 with data 0x06
    - send_g1_message("25 06") -> Same as above (spaces removed)
    - send_g1_message("25 06 00 01") -> Sends 0x25060001
    - send_g1_message("ABCD 1234") -> Sends 0xABCD1234
    - send_g1_message("1234567890ABCDEF") -> Sends longer message

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
hex_dataYes

Implementation Reference

  • The main @server.tool() decorated handler function for send_g1_message. Performs input validation and cleaning, auto-connects if needed, delegates to BLE manager.send_message, processes response.
    @server.tool()
    async def send_g1_message(hex_data: str) -> Dict[str, Any]:
        """Send a message to the connected G1 device.
        
        Args:
            hex_data (str): Hexadecimal string representation of the message to send.
                           Can contain spaces, tabs, or other whitespace which will be automatically removed.
                           Should contain only valid hexadecimal characters (0-9, A-F, a-f).
                           Examples: "2506", "25 06", "25 06 00 01", "25 06 00 01 04 02"
        
        Returns:
            Dict[str, Any]: JSON response with message status including:
                - result: "success" or "error"
                - message_sent: Boolean indicating if message was sent
                - response_received: Boolean indicating if response was received
                - response_data: Response data in hex format (if received)
                - timeout: Boolean indicating if message timed out
                - error: Error message if sending failed
            
        Note:
            This sends the hex_data as bytes to the connected G1 device using the
            Nordic BLE UART protocol and waits for a response up to 2 seconds.
            All messages are treated as commands and will timeout after 2 seconds if no response is received.
            Spaces, tabs, and other whitespace in hex_data are automatically removed before processing.
            
        Examples:
            - send_g1_message("2506") -> Sends command 0x25 with data 0x06
            - send_g1_message("25 06") -> Same as above (spaces removed)
            - send_g1_message("25 06 00 01") -> Sends 0x25060001
            - send_g1_message("ABCD 1234") -> Sends 0xABCD1234
            - send_g1_message("1234567890ABCDEF") -> Sends longer message
        """
        # Check if connected and has services, if not try to connect
        if not ble_manager.is_connected or not ble_manager.uart_service:
            logger.info("Not connected or missing services, attempting to connect...")
            try:
                success = await auto_connect_to_right_device()
                if not success:
                    return {
                        "result": "error",
                        "message_sent": False,
                        "error": "Failed to connect to G1 device. Please ensure device is available and try again."
                    }
            except Exception as e:
                return {
                    "result": "error",
                    "message_sent": False,
                    "error": f"Connection failed: {str(e)}"
                }
        
        # Validate hex data format and remove spaces/whitespace
        if not hex_data:
            return {
                "result": "error",
                "message_sent": False,
                "error": "Hex data cannot be empty"
            }
        
        # Remove all spaces, tabs, newlines, and other whitespace
        cleaned_hex = ''.join(hex_data.split())
        
        # Validate that cleaned hex data contains only valid hexadecimal characters
        if not cleaned_hex or not all(c in '0123456789ABCDEFabcdef' for c in cleaned_hex):
            return {
                "result": "error",
                "message_sent": False,
                "error": "Invalid hex data format. Use only hexadecimal characters (0-9, A-F, a-f). Spaces and whitespace are automatically removed."
            }
        
        try:
            # Send message using cleaned hex data
            response_data = await ble_manager.send_message(cleaned_hex)
        except Exception as e:
            error_msg = str(e)
            logger.error(f"Failed to send message: {error_msg}")
            
            # Check if this is a connection-related error
            if "connection lost" in error_msg.lower() or "disconnected" in error_msg.lower():
                return {
                    "result": "error",
                    "message_sent": False,
                    "error": f"Connection lost while sending message. Please reconnect using connect_g1_device. Error: {error_msg}"
                }
            elif "timeout" in error_msg.lower():
                return {
                    "result": "error",
                    "message_sent": False,
                    "error": f"Message timeout. The device may be unresponsive. Error: {error_msg}"
                }
            else:
                return {
                    "result": "error",
                    "message_sent": False,
                    "error": f"Failed to send message: {error_msg}"
                }
    
        # Format response based on what was returned
        if response_data:
            # Format response data as space-separated hex pairs
            hex_pairs = ' '.join([response_data[i:i+2] for i in range(0, len(response_data), 2)])
            return {
                "result": "success",
                "message_sent": True,
                "response_received": True,
                "response_data": hex_pairs,
                "timeout": False
            }
        else:
            return {
                "result": "success",
                "message_sent": True,
                "response_received": False,
                "response_data": None,
                "timeout": True
            }
  • Core BLE send_message method in NordicBLEUARTManager class used by the tool handler. Converts hex to bytes, writes to TX char, awaits matching response notification on RX char, returns hex response or "" on timeout.
    async def send_message(self, hex_data: str) -> str:
        """Send a UART message and return the hex response packet or empty string"""
        if not self.client or not self.is_connected:
            raise Exception("Not connected to any device")
        
        if not self.tx_characteristic:
            raise Exception("UART TX characteristic not available")
        
        # Check connection health before sending
        if not await self._check_connection_health():
            logger.warning("Connection health check failed, attempting reconnection...")
            await self._handle_disconnection()
            raise Exception("Connection lost, please reconnect")
        
        # Convert hex string to bytes
        data = bytes.fromhex(hex_data)
        command_code = hex_data[:2].upper() if len(hex_data) >= 2 else ""
        
        # Create sent message record
        sent_msg = {
            "id": f"sent_{self.message_id_counter}",
            "timestamp": datetime.now(),
            "direction": "sent",
            "data": data,
            "command_code": command_code
        }
        
        self.message_id_counter += 1
        self.communication_log.append(sent_msg)
        
        # Add to pending messages
        self.pending_messages[sent_msg["id"]] = sent_msg
        
        response_event = asyncio.Event()
        sent_msg["response_event"] = response_event
    
        try:
            # Send the data
            await self.client.write_gatt_char(self.tx_characteristic.uuid, data)
            logger.info(f"Sent UART message: {hex_data} (Command: {command_code})")
            self.last_activity_time = datetime.now()
            
            # Wait for response with 2-second timeout (increased from 1 second)
            await asyncio.wait_for(response_event.wait(), timeout=2.0)
            
            # Response received - return the hex data
            response_data = sent_msg.get("response_data", "")
            return response_data
            
        except asyncio.TimeoutError:
            # Timeout reached
            if sent_msg["id"] in self.pending_messages:
                del self.pending_messages[sent_msg["id"]]
            logger.warning(f"Message timeout for command {command_code}")
            return ""  # Return empty string on timeout
            
        except Exception as e:
            logger.error(f"Failed to send message: {e}")
            # Clean up pending message on error
            if sent_msg["id"] in self.pending_messages:
                del self.pending_messages[sent_msg["id"]]
            
            # Check if this is a connection error
            if "disconnected" in str(e).lower() or "not connected" in str(e).lower():
                logger.error("Connection error detected, attempting reconnection...")
                await self._handle_disconnection()
            
            raise
  • mcp_server.py:273-273 (registration)
    The @server.tool() decorator registers the send_g1_message function as an MCP tool.
    @server.tool()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danroblewis/g1_uart_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server