Skip to main content
Glama

led

Control LED devices through natural language commands via the mcp2tcp server, enabling AI-powered hardware interaction over TCP connections.

Instructions

打开LED (state=on)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
stateYesParameter state for the led command

Implementation Reference

  • Dynamically registers the 'led' tool if config.yaml defines a command starting with 'CMD_LED'. Generates tool name 'led' and inputSchema from command parameters and placeholders.
    @server.list_tools()
    async def handle_list_tools() -> List[types.Tool]:
        """List available tools."""
        if config is None:
            return []
            
        tools = []
        for cmd_id, command in config.commands.items():
            # 从命令字符串中提取命令名(去掉CMD_前缀)
            cmd_name = command.command.split()[0].replace("CMD_", "").lower()
            
            # 构建参数描述
            properties = {}
            required = []
            
            # 使用配置文件中的参数定义
            if hasattr(command, 'parameters'):
                for param in command.parameters:
                    properties[param['name']] = {
                        "type": param['type'],
                        "description": param['description'],
                        **({"enum": param['enum']} if 'enum' in param else {})
                    }
                    if param.get('required', False):
                        required.append(param['name'])
            else:
                # 如果没有参数定义,从命令字符串中提取
                import re
                param_names = re.findall(r'\{(\w+)\}', command.command)
                for param_name in param_names:
                    properties[param_name] = {
                        "type": "string",
                        "description": f"Parameter {param_name} for the {cmd_name} command",
                        "examples": [p.format(**{param_name: "value"}) for p in command.prompts if "{" + param_name + "}" in p]
                    }
                    required.append(param_name)
    
            tool = types.Tool(
                name=cmd_name,  # 使用命令名作为工具名
                description=command.prompts[0] if command.prompts else f"Execute {cmd_name} command",
                inputSchema={
                    "type": "object",
                    "properties": properties,
                    "required": required,
                    "additionalProperties": False
                }
            )
            tools.append(tool)
            logger.debug(f"Registered tool: {cmd_name} with parameters: {properties}")
        
        return tools
  • Generic handler for all tools including 'led': matches tool name to config command, validates arguments, formats the command string (e.g., 'CMD_LED {state}'), sends via TCP, returns response.
    @server.call_tool()
    async def handle_call_tool(name: str, arguments: Dict[str, Any] | None) -> List[types.TextContent]:
        """Handle tool execution requests."""
        if config is None:
            return [types.TextContent(
                type="text",
                text="Error: Configuration not loaded"
            )]
            
        try:
            logger.info(f"Tool call received - Name: {name}, Arguments: {arguments}")
            
            # 查找对应的命令
            cmd_found = None
            for cmd_id, command in config.commands.items():
                cmd_name = command.command.split()[0].replace("CMD_", "").lower()
                if cmd_name == name:
                    cmd_found = command
                    break
            
            if cmd_found is None:
                error_msg = f"Error: Unknown tool '{name}'\n"
                error_msg += "Please check:\n"
                error_msg += "1. Tool name is correct\n"
                error_msg += "2. Tool is configured in config.yaml"
                logger.error(error_msg)
                return [types.TextContent(
                    type="text",
                    text=error_msg
                )]
    
            if arguments is None:
                arguments = {}
            
            # 验证必需的参数
            import re
            param_names = re.findall(r'\{(\w+)\}', cmd_found.command)
            missing_params = [param for param in param_names if param not in arguments]
            if missing_params:
                error_msg = f"Error: Missing required parameters: {', '.join(missing_params)}\n"
                error_msg += "Please provide all required parameters."
                logger.error(error_msg)
                return [types.TextContent(
                    type="text",
                    text=error_msg
                )]
            
            # 发送命令并等待响应
            try:
                response = tcp_connection.send_command(cmd_found, arguments)
                logger.debug(f"Command response: {response}")
                return response
            except ConnectionError as e:
                error_msg = f"Error: Connection failed - {str(e)}\n"
                error_msg += "Please check:\n"
                error_msg += "1. TCP server is running\n"
                error_msg += "2. Connection settings are correct"
                logger.error(error_msg)
                return [types.TextContent(
                    type="text",
                    text=error_msg
                )]
            except TimeoutError as e:
                error_msg = f"Error: Command timeout - {str(e)}\n"
                error_msg += "Please check:\n"
                error_msg += "1. Device is responding\n"
                error_msg += "2. Timeout settings are appropriate"
                logger.error(error_msg)
                return [types.TextContent(
                    type="text",
                    text=error_msg
                )]
            
        except Exception as e:
            error_msg = f"Error: {str(e)}\n"
            error_msg += "Please check:\n"
            error_msg += "1. Configuration is correct\n"
            error_msg += "2. Device is functioning properly"
            logger.error(f"Unexpected error: {str(e)}", exc_info=True)
            return [types.TextContent(
                type="text",
                text=error_msg
            )]
  • Core helper that executes the formatted 'CMD_LED {state}' command over TCP socket, handles connection, sending with \r\n, receiving until timeout or end marker, formats as TextContent.
    def send_command(self, command: Command, arguments: dict[str, Any] | None) -> list[types.TextContent]:
        """Send a command to the TCP server and return result according to MCP protocol."""
        try:
            if not self.socket:
                if not self.connect():
                    return [types.TextContent(
                        type="text",
                        text=f"Failed to connect to TCP server at {self.remote_ip}:{self.port}"
                    )]
            if command.data_type == "ascii":
                # 准备命令
                cmd_str = command.command.format(**arguments)
                # 确保命令以\r\n结尾
                cmd_str = cmd_str.rstrip() + '\r\n'  # 移除可能的空白字符,强制添加\r\n
                command_bytes = cmd_str.encode()
                logger.info(f"Sending command: {cmd_str.strip()}")
                logger.info(f"Sent command: {command_bytes.strip().decode('ascii')}")
                self.socket.sendall(command_bytes)
    
            elif command.data_type == "hex":
                command_bytes = bytes.fromhex(command.command.replace(" ", ""))
                logger.info(f"Sent command: {command.command}")
                self.socket.sendall(command_bytes)
            self.socket.settimeout(self.receive_timeout)
            responses = []
            while True:
                try:
                    response = self.socket.recv(4096)
                    if response:
                        logger.debug(f"Received data: {response}")
                        responses.append(response)
                        if command.data_type == "ascii" and response.endswith(b'\r\n'):
                            break
                        elif command.data_type == "hex":
                            break
                    else:
                        break
                except socket.timeout as e:
                    logger.error(f"TCP receive timeout: {str(e)}")
                    return [types.TextContent(
                        type="text",
                        text=f"TCP receive timeout: {str(e)}"
                    )]
    
            if not responses:
                return [types.TextContent(
                    type="text",
                    text=f"No response received from TCP server within {self.receive_timeout} seconds"
                )]
    
            first_response = responses[0].decode().strip()
            logger.info(f"Received response: {first_response}")
    
            if self.response_start_string in first_response:
                return [types.TextContent(
                    type="text",
                    text=first_response
                )]
            else:
                return [types.TextContent(
                    type="text",
                    text=f"Invalid response: {first_response}"
                )]
    
        except socket.timeout as e:
            logger.error(f"TCP send timeout: {str(e)}")
            return [types.TextContent(
                type="text",
                text=f"TCP send timeout: {str(e)}"
            )]
        except Exception as e:
            logger.error(f"TCP error: {str(e)}")
            return [types.TextContent(
                type="text",
                text=f"TCP error: {str(e)}"
            )]
  • Mock device-side LED handler used in tests, simulates the TCP server response for CMD_LED command.
    def set_led(self, state):
        if state.lower() in ["on", "off"]:
            self.led_state = state.lower()
            return f"CMD LED state set to {state}\r\n"
        return f"CMD Error: Invalid LED state. Use 'on' or 'off'\r\n"
  • Test TCP server handler that processes 'CMD_LED {state}' and calls set_led mock implementation.
    # 处理LED控制命令
    elif command.startswith("CMD_LED"):
        try:
            state = command.split()[1]
            return device_state.set_led(state)
        except IndexError:
            return "CMD Error: Missing LED state parameter\r\n"
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mcp2everything/mcp2tcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server