Skip to main content
Glama
danroblewis

G1 UART MCP Server

by danroblewis

send_g1_message

Send hexadecimal commands to G1 Bluetooth devices via the Nordic UART protocol, automatically removing whitespace and waiting up to 2 seconds for responses.

Instructions

Send a message to the connected G1 device.

Args:
    hex_data (str): Hexadecimal string representation of the message to send.
                   Can contain spaces, tabs, or other whitespace which will be automatically removed.
                   Should contain only valid hexadecimal characters (0-9, A-F, a-f).
                   Examples: "2506", "25 06", "25 06 00 01", "25 06 00 01 04 02"

Returns:
    Dict[str, Any]: JSON response with message status including:
        - result: "success" or "error"
        - message_sent: Boolean indicating if message was sent
        - response_received: Boolean indicating if response was received
        - response_data: Response data in hex format (if received)
        - timeout: Boolean indicating if message timed out
        - error: Error message if sending failed
    
Note:
    This sends the hex_data as bytes to the connected G1 device using the
    Nordic BLE UART protocol and waits for a response up to 2 seconds.
    All messages are treated as commands and will timeout after 2 seconds if no response is received.
    Spaces, tabs, and other whitespace in hex_data are automatically removed before processing.
    
Examples:
    - send_g1_message("2506") -> Sends command 0x25 with data 0x06
    - send_g1_message("25 06") -> Same as above (spaces removed)
    - send_g1_message("25 06 00 01") -> Sends 0x25060001
    - send_g1_message("ABCD 1234") -> Sends 0xABCD1234
    - send_g1_message("1234567890ABCDEF") -> Sends longer message

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
hex_dataYes

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The main @server.tool() decorated handler function for send_g1_message. Performs input validation and cleaning, auto-connects if needed, delegates to BLE manager.send_message, processes response.
    @server.tool()
    async def send_g1_message(hex_data: str) -> Dict[str, Any]:
        """Send a message to the connected G1 device.
        
        Args:
            hex_data (str): Hexadecimal string representation of the message to send.
                           Can contain spaces, tabs, or other whitespace which will be automatically removed.
                           Should contain only valid hexadecimal characters (0-9, A-F, a-f).
                           Examples: "2506", "25 06", "25 06 00 01", "25 06 00 01 04 02"
        
        Returns:
            Dict[str, Any]: JSON response with message status including:
                - result: "success" or "error"
                - message_sent: Boolean indicating if message was sent
                - response_received: Boolean indicating if response was received
                - response_data: Response data in hex format (if received)
                - timeout: Boolean indicating if message timed out
                - error: Error message if sending failed
            
        Note:
            This sends the hex_data as bytes to the connected G1 device using the
            Nordic BLE UART protocol and waits for a response up to 2 seconds.
            All messages are treated as commands and will timeout after 2 seconds if no response is received.
            Spaces, tabs, and other whitespace in hex_data are automatically removed before processing.
            
        Examples:
            - send_g1_message("2506") -> Sends command 0x25 with data 0x06
            - send_g1_message("25 06") -> Same as above (spaces removed)
            - send_g1_message("25 06 00 01") -> Sends 0x25060001
            - send_g1_message("ABCD 1234") -> Sends 0xABCD1234
            - send_g1_message("1234567890ABCDEF") -> Sends longer message
        """
        # Check if connected and has services, if not try to connect
        if not ble_manager.is_connected or not ble_manager.uart_service:
            logger.info("Not connected or missing services, attempting to connect...")
            try:
                success = await auto_connect_to_right_device()
                if not success:
                    return {
                        "result": "error",
                        "message_sent": False,
                        "error": "Failed to connect to G1 device. Please ensure device is available and try again."
                    }
            except Exception as e:
                return {
                    "result": "error",
                    "message_sent": False,
                    "error": f"Connection failed: {str(e)}"
                }
        
        # Validate hex data format and remove spaces/whitespace
        if not hex_data:
            return {
                "result": "error",
                "message_sent": False,
                "error": "Hex data cannot be empty"
            }
        
        # Remove all spaces, tabs, newlines, and other whitespace
        cleaned_hex = ''.join(hex_data.split())
        
        # Validate that cleaned hex data contains only valid hexadecimal characters
        if not cleaned_hex or not all(c in '0123456789ABCDEFabcdef' for c in cleaned_hex):
            return {
                "result": "error",
                "message_sent": False,
                "error": "Invalid hex data format. Use only hexadecimal characters (0-9, A-F, a-f). Spaces and whitespace are automatically removed."
            }
        
        try:
            # Send message using cleaned hex data
            response_data = await ble_manager.send_message(cleaned_hex)
        except Exception as e:
            error_msg = str(e)
            logger.error(f"Failed to send message: {error_msg}")
            
            # Check if this is a connection-related error
            if "connection lost" in error_msg.lower() or "disconnected" in error_msg.lower():
                return {
                    "result": "error",
                    "message_sent": False,
                    "error": f"Connection lost while sending message. Please reconnect using connect_g1_device. Error: {error_msg}"
                }
            elif "timeout" in error_msg.lower():
                return {
                    "result": "error",
                    "message_sent": False,
                    "error": f"Message timeout. The device may be unresponsive. Error: {error_msg}"
                }
            else:
                return {
                    "result": "error",
                    "message_sent": False,
                    "error": f"Failed to send message: {error_msg}"
                }
    
        # Format response based on what was returned
        if response_data:
            # Format response data as space-separated hex pairs
            hex_pairs = ' '.join([response_data[i:i+2] for i in range(0, len(response_data), 2)])
            return {
                "result": "success",
                "message_sent": True,
                "response_received": True,
                "response_data": hex_pairs,
                "timeout": False
            }
        else:
            return {
                "result": "success",
                "message_sent": True,
                "response_received": False,
                "response_data": None,
                "timeout": True
            }
  • Core BLE send_message method in NordicBLEUARTManager class used by the tool handler. Converts hex to bytes, writes to TX char, awaits matching response notification on RX char, returns hex response or "" on timeout.
    async def send_message(self, hex_data: str) -> str:
        """Send a UART message and return the hex response packet or empty string"""
        if not self.client or not self.is_connected:
            raise Exception("Not connected to any device")
        
        if not self.tx_characteristic:
            raise Exception("UART TX characteristic not available")
        
        # Check connection health before sending
        if not await self._check_connection_health():
            logger.warning("Connection health check failed, attempting reconnection...")
            await self._handle_disconnection()
            raise Exception("Connection lost, please reconnect")
        
        # Convert hex string to bytes
        data = bytes.fromhex(hex_data)
        command_code = hex_data[:2].upper() if len(hex_data) >= 2 else ""
        
        # Create sent message record
        sent_msg = {
            "id": f"sent_{self.message_id_counter}",
            "timestamp": datetime.now(),
            "direction": "sent",
            "data": data,
            "command_code": command_code
        }
        
        self.message_id_counter += 1
        self.communication_log.append(sent_msg)
        
        # Add to pending messages
        self.pending_messages[sent_msg["id"]] = sent_msg
        
        response_event = asyncio.Event()
        sent_msg["response_event"] = response_event
    
        try:
            # Send the data
            await self.client.write_gatt_char(self.tx_characteristic.uuid, data)
            logger.info(f"Sent UART message: {hex_data} (Command: {command_code})")
            self.last_activity_time = datetime.now()
            
            # Wait for response with 2-second timeout (increased from 1 second)
            await asyncio.wait_for(response_event.wait(), timeout=2.0)
            
            # Response received - return the hex data
            response_data = sent_msg.get("response_data", "")
            return response_data
            
        except asyncio.TimeoutError:
            # Timeout reached
            if sent_msg["id"] in self.pending_messages:
                del self.pending_messages[sent_msg["id"]]
            logger.warning(f"Message timeout for command {command_code}")
            return ""  # Return empty string on timeout
            
        except Exception as e:
            logger.error(f"Failed to send message: {e}")
            # Clean up pending message on error
            if sent_msg["id"] in self.pending_messages:
                del self.pending_messages[sent_msg["id"]]
            
            # Check if this is a connection error
            if "disconnected" in str(e).lower() or "not connected" in str(e).lower():
                logger.error("Connection error detected, attempting reconnection...")
                await self._handle_disconnection()
            
            raise
  • mcp_server.py:273-273 (registration)
    The @server.tool() decorator registers the send_g1_message function as an MCP tool.
    @server.tool()
Behavior5/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries full burden and excels at disclosing behavioral traits. It explains the Nordic BLE UART protocol usage, 2-second timeout behavior, automatic whitespace removal, and that all messages are treated as commands. It also details the response structure and error conditions comprehensively.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

Well-structured with clear sections (Args, Returns, Note, Examples) and front-loaded core purpose. Some redundancy exists (whitespace removal mentioned twice), and the examples section could be slightly more concise while maintaining clarity. Overall efficient but with minor room for optimization.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness5/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given a single-parameter tool with no annotations but with output schema, the description provides complete context. It covers purpose, parameter semantics, behavioral details (protocol, timeout, command nature), return structure, and practical examples. The output schema handles return value documentation, allowing the description to focus on operational context.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters5/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

With 0% schema description coverage for the single parameter, the description fully compensates by providing rich semantic information. It explains hex_data format requirements, whitespace handling, valid character constraints, and provides multiple concrete examples showing different input patterns and their corresponding byte interpretations.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the specific action ('Send a message') and target resource ('connected G1 device'), distinguishing it from sibling tools like connect/disconnect/scan devices. It explicitly identifies the tool's function as sending data to a specific hardware device.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines4/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides clear context about when to use this tool (after device connection, for command communication), but doesn't explicitly contrast with alternatives or state when NOT to use it. The examples show typical usage patterns, but no explicit guidance about prerequisites like needing a connected device first.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danroblewis/g1_uart_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server