Skip to main content
Glama

get_pico_info

Retrieve hardware information from connected devices via serial communication to enable AI-driven control and monitoring.

Instructions

Execute get_pico_info command

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • MCP tool call handler that dispatches execution for 'get_pico_info' by retrieving the command from config and sending it over serial via send_command.
    @server.call_tool()
    async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
        """Handle tool execution requests according to MCP protocol."""
        logger.info(f"Tool call received - Name: {name}, Arguments: {arguments}")
        
        try:
            if name not in config.commands:
                error_msg = f"[MCP2Serial v{VERSION}] Error: Unknown tool '{name}'\n"
                error_msg += "Please check:\n"
                error_msg += "1. Tool name is correct\n"
                error_msg += "2. Tool is configured in config.yaml"
                return [types.TextContent(
                    type="text",
                    text=error_msg
                )]
    
            command = config.commands[name]
            if arguments is None:
                arguments = {}
            
            # 发送命令并返回 MCP 格式的响应
            return serial_connection.send_command(command, arguments)
    
        except Exception as e:
            logger.error(f"Error handling tool call: {str(e)}")
            error_msg = f"[MCP2Serial v{VERSION}] Error: {str(e)}\n"
            error_msg += "Please check:\n"
            error_msg += "1. Configuration is correct\n"
            error_msg += "2. Device is functioning properly"
            return [types.TextContent(
                type="text",
                text=error_msg
            )]
  • Registers the 'get_pico_info' tool dynamically from config.commands, including schema generation based on command template parameters.
    @server.list_tools()
    async def handle_list_tools() -> list[types.Tool]:
        """List available tools for the MCP service."""
        logger.info("Listing available tools")
        tools = []
        
        for cmd_id, command in config.commands.items():
            # 从命令字符串中提取参数名
            import re
            param_names = re.findall(r'\{(\w+)\}', command.command)
            properties = {name: {"type": "string"} for name in param_names}
            
            tools.append(types.Tool(
                name=cmd_id,
                description=f"Execute {cmd_id} command",
                inputSchema={
                    "type": "object",
                    "properties": properties,
                    "required": param_names
                },
                prompts=command.prompts
            ))
        
        return tools
  • Device-side handler function that gathers and returns Pico board information (board name, MicroPython version, frequency, memory, disk usage). Called in response to 'PICO_INFO' serial command.
    def get_pico_info():
        d = uos.uname()
        board_name = d[4]
        micropython_version = d[2]
    
        system_freq = machine.freq() // 1000000  # 系统频率 (MHz)
    
        memory_info = gc.mem_free() + gc.mem_alloc()  # 内存信息
    
        disk_info = uos.statvfs('/')
        total_disk_size = disk_info[0] * disk_info[2]
        free_disk_size = disk_info[0] * disk_info[3]
    
        # 拼接信息字符串
        info = (
            f"Board: {board_name}, "
            f"MicroPython: {micropython_version}, "
            f"Freq: {system_freq} MHz, "
            f"Memory: {memory_info} bytes, "
            f"Disk: Total {total_disk_size} bytes, Free {free_disk_size} bytes"
        )
        return info
  • Firmware serial input handler that recognizes 'PICO_INFO' command and invokes get_pico_info() to produce the response sent back over serial.
    elif user_input.strip() == "PICO_INFO":
        info = get_pico_info()
        print(f"OK {info}")
  • Helper method in SerialConnection that formats and sends the serial command for 'get_pico_info' (e.g., 'PICO_INFO'), reads the 'OK info' response from device, and returns it as MCP TextContent.
    def send_command(self, command: Command, arguments: Dict[str, Any]) -> list[types.TextContent]:
        """Send a command to the serial port and return result according to MCP protocol."""
        try:
            # 确保连接
            if not self.is_loopback and (not self.serial_port or not self.serial_port.is_open):
                logger.info("No active connection, attempting to connect...")
                if not self.connect():
                    error_msg = f"[MCP2Serial v{VERSION}] Failed to establish serial connection.\n"
                    error_msg += "Please check:\n"
                    error_msg += "1. Serial port is correctly configured in config.yaml\n"
                    error_msg += "2. Device is properly connected\n"
                    error_msg += "3. No other program is using the port"
                    return [types.TextContent(
                        type="text",
                        text=error_msg
                    )]
    
            # 准备命令
            cmd_str = command.command.format(**arguments)
            # 确保命令以\r\n结尾
            cmd_str = cmd_str.rstrip() + '\r\n'  # 移除可能的空白字符,强制添加\r\n
    
            cmd_bytes = cmd_str.encode()
            logger.info(f"Sending command: {cmd_str.strip()}")
            logger.info(f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}")
    
            if self.is_loopback:
                # 回环模式:直接返回发送的命令和OK响应
                responses = [
                    cmd_str.encode(),  # 命令回显
                    f"{config.response_start_string}\r\n".encode()  # OK响应
                ]
            else:
                # 清空缓冲区
                self.serial_port.reset_input_buffer()
                self.serial_port.reset_output_buffer()
    
                # 发送命令
                bytes_written = self.serial_port.write(cmd_bytes)
                logger.info(f"Wrote {bytes_written} bytes")
                self.serial_port.flush()
    
                # 等待一段时间确保命令被处理
                time.sleep(0.1)
    
                # 读取所有响应
                responses = []
                while self.serial_port.in_waiting:
                    response = self.serial_port.readline()
                    logger.info(f"Raw response: {response}")
                    if response:
                        responses.append(response)
    
            if not responses:
                logger.error("No response received within timeout")
                error_msg = f"[MCP2Serial v{VERSION}] Command timeout - no response within {self.read_timeout} second(s)\n"
                error_msg += f"Command sent: {cmd_str.strip()}\n"
                error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n"
                error_msg += "Please check:\n"
                error_msg += "1. Device is powered and responding\n"
                error_msg += "2. Baud rate matches device settings\n"
                error_msg += "3. Serial connection is stable\n"
                return [types.TextContent(
                    type="text",
                    text=error_msg
                )]
    
            # 解码第一行响应
            first_response = responses[0]
            first_line = first_response.decode().strip()
            logger.info(f"Decoded first response: {first_line}")
    
            # 检查是否有第二行响应
            if len(responses) > 1:
                second_response = responses[1]
                if second_response.startswith(config.response_start_string.encode()):  # 使用配置的应答开始字符串
                    if command.need_parse:
                        return [types.TextContent(
                            type="text",
                            text=second_response.decode().strip()
                        )]
                    return []
    
            # 如果响应不是预期的格式,返回详细的错误信息
            error_msg = f"[MCP2Serial v{VERSION}] Command execution failed.\n"
            error_msg += f"Command sent: {cmd_str.strip()}\n"
            error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n"
            error_msg += "Responses received:\n"
            for i, resp in enumerate(responses, 1):
                error_msg += f"{i}. Raw: {resp!r}\n   Decoded: {resp.decode().strip()}\n"
            error_msg += "\nPossible reasons:\n"
            error_msg += f"- Device echoed the command but did not send {config.response_start_string} response\n"
            error_msg += "- Command format may be incorrect\n"
            error_msg += "- Device may be in wrong mode\n"
            return [types.TextContent(
                type="text",
                text=error_msg
            )]
    
        except serial.SerialTimeoutException as e:
            logger.error(f"Serial timeout: {str(e)}")
            error_msg = f"[MCP2Serial v{VERSION}] Command timeout - {str(e)}\n"
            error_msg += "Please check:\n"
            error_msg += "1. Device is powered and responding\n"
            error_msg += "2. Baud rate matches device settings\n"
            error_msg += "3. Device is not busy with other operations"
            return [types.TextContent(
                type="text",
                text=error_msg
            )]
        except serial.SerialException as e:
            logger.error(f"Serial error: {str(e)}")
            error_msg = f"[MCP2Serial v{VERSION}] Serial communication failed - {str(e)}\n"
            error_msg += "Please check:\n"
            error_msg += "1. Serial port is correctly configured in config.yaml\n"
            error_msg += "2. Device is properly connected\n"
            error_msg += "3. No other program is using the port"
            return [types.TextContent(
                type="text",
                text=error_msg
            )]
    
    def close(self) -> None:
        """Close the serial port connection if open."""
        if self.serial_port and self.serial_port.is_open:
            try:
                self.serial_port.close()
                logger.info(f"Closed serial port connection: {self.serial_port.port}")
            except Exception as e:
                logger.error(f"Error closing port: {str(e)}")
            self.serial_port = None
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mcp2everything/mcp2serial'

If you have feedback or need assistance with the MCP directory API, please join our Discord server