Skip to main content
Glama

get_pico_info

Retrieve hardware information from connected devices via serial communication to enable AI-driven control and monitoring.

Instructions

Execute get_pico_info command

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • MCP tool call handler that dispatches execution for 'get_pico_info' by retrieving the command from config and sending it over serial via send_command.
    @server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]: """Handle tool execution requests according to MCP protocol.""" logger.info(f"Tool call received - Name: {name}, Arguments: {arguments}") try: if name not in config.commands: error_msg = f"[MCP2Serial v{VERSION}] Error: Unknown tool '{name}'\n" error_msg += "Please check:\n" error_msg += "1. Tool name is correct\n" error_msg += "2. Tool is configured in config.yaml" return [types.TextContent( type="text", text=error_msg )] command = config.commands[name] if arguments is None: arguments = {} # 发送命令并返回 MCP 格式的响应 return serial_connection.send_command(command, arguments) except Exception as e: logger.error(f"Error handling tool call: {str(e)}") error_msg = f"[MCP2Serial v{VERSION}] Error: {str(e)}\n" error_msg += "Please check:\n" error_msg += "1. Configuration is correct\n" error_msg += "2. Device is functioning properly" return [types.TextContent( type="text", text=error_msg )]
  • Registers the 'get_pico_info' tool dynamically from config.commands, including schema generation based on command template parameters.
    @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools for the MCP service.""" logger.info("Listing available tools") tools = [] for cmd_id, command in config.commands.items(): # 从命令字符串中提取参数名 import re param_names = re.findall(r'\{(\w+)\}', command.command) properties = {name: {"type": "string"} for name in param_names} tools.append(types.Tool( name=cmd_id, description=f"Execute {cmd_id} command", inputSchema={ "type": "object", "properties": properties, "required": param_names }, prompts=command.prompts )) return tools
  • Device-side handler function that gathers and returns Pico board information (board name, MicroPython version, frequency, memory, disk usage). Called in response to 'PICO_INFO' serial command.
    def get_pico_info(): d = uos.uname() board_name = d[4] micropython_version = d[2] system_freq = machine.freq() // 1000000 # 系统频率 (MHz) memory_info = gc.mem_free() + gc.mem_alloc() # 内存信息 disk_info = uos.statvfs('/') total_disk_size = disk_info[0] * disk_info[2] free_disk_size = disk_info[0] * disk_info[3] # 拼接信息字符串 info = ( f"Board: {board_name}, " f"MicroPython: {micropython_version}, " f"Freq: {system_freq} MHz, " f"Memory: {memory_info} bytes, " f"Disk: Total {total_disk_size} bytes, Free {free_disk_size} bytes" ) return info
  • Firmware serial input handler that recognizes 'PICO_INFO' command and invokes get_pico_info() to produce the response sent back over serial.
    elif user_input.strip() == "PICO_INFO": info = get_pico_info() print(f"OK {info}")
  • Helper method in SerialConnection that formats and sends the serial command for 'get_pico_info' (e.g., 'PICO_INFO'), reads the 'OK info' response from device, and returns it as MCP TextContent.
    def send_command(self, command: Command, arguments: Dict[str, Any]) -> list[types.TextContent]: """Send a command to the serial port and return result according to MCP protocol.""" try: # 确保连接 if not self.is_loopback and (not self.serial_port or not self.serial_port.is_open): logger.info("No active connection, attempting to connect...") if not self.connect(): error_msg = f"[MCP2Serial v{VERSION}] Failed to establish serial connection.\n" error_msg += "Please check:\n" error_msg += "1. Serial port is correctly configured in config.yaml\n" error_msg += "2. Device is properly connected\n" error_msg += "3. No other program is using the port" return [types.TextContent( type="text", text=error_msg )] # 准备命令 cmd_str = command.command.format(**arguments) # 确保命令以\r\n结尾 cmd_str = cmd_str.rstrip() + '\r\n' # 移除可能的空白字符,强制添加\r\n cmd_bytes = cmd_str.encode() logger.info(f"Sending command: {cmd_str.strip()}") logger.info(f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}") if self.is_loopback: # 回环模式:直接返回发送的命令和OK响应 responses = [ cmd_str.encode(), # 命令回显 f"{config.response_start_string}\r\n".encode() # OK响应 ] else: # 清空缓冲区 self.serial_port.reset_input_buffer() self.serial_port.reset_output_buffer() # 发送命令 bytes_written = self.serial_port.write(cmd_bytes) logger.info(f"Wrote {bytes_written} bytes") self.serial_port.flush() # 等待一段时间确保命令被处理 time.sleep(0.1) # 读取所有响应 responses = [] while self.serial_port.in_waiting: response = self.serial_port.readline() logger.info(f"Raw response: {response}") if response: responses.append(response) if not responses: logger.error("No response received within timeout") error_msg = f"[MCP2Serial v{VERSION}] Command timeout - no response within {self.read_timeout} second(s)\n" error_msg += f"Command sent: {cmd_str.strip()}\n" error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n" error_msg += "Please check:\n" error_msg += "1. Device is powered and responding\n" error_msg += "2. Baud rate matches device settings\n" error_msg += "3. Serial connection is stable\n" return [types.TextContent( type="text", text=error_msg )] # 解码第一行响应 first_response = responses[0] first_line = first_response.decode().strip() logger.info(f"Decoded first response: {first_line}") # 检查是否有第二行响应 if len(responses) > 1: second_response = responses[1] if second_response.startswith(config.response_start_string.encode()): # 使用配置的应答开始字符串 if command.need_parse: return [types.TextContent( type="text", text=second_response.decode().strip() )] return [] # 如果响应不是预期的格式,返回详细的错误信息 error_msg = f"[MCP2Serial v{VERSION}] Command execution failed.\n" error_msg += f"Command sent: {cmd_str.strip()}\n" error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n" error_msg += "Responses received:\n" for i, resp in enumerate(responses, 1): error_msg += f"{i}. Raw: {resp!r}\n Decoded: {resp.decode().strip()}\n" error_msg += "\nPossible reasons:\n" error_msg += f"- Device echoed the command but did not send {config.response_start_string} response\n" error_msg += "- Command format may be incorrect\n" error_msg += "- Device may be in wrong mode\n" return [types.TextContent( type="text", text=error_msg )] except serial.SerialTimeoutException as e: logger.error(f"Serial timeout: {str(e)}") error_msg = f"[MCP2Serial v{VERSION}] Command timeout - {str(e)}\n" error_msg += "Please check:\n" error_msg += "1. Device is powered and responding\n" error_msg += "2. Baud rate matches device settings\n" error_msg += "3. Device is not busy with other operations" return [types.TextContent( type="text", text=error_msg )] except serial.SerialException as e: logger.error(f"Serial error: {str(e)}") error_msg = f"[MCP2Serial v{VERSION}] Serial communication failed - {str(e)}\n" error_msg += "Please check:\n" error_msg += "1. Serial port is correctly configured in config.yaml\n" error_msg += "2. Device is properly connected\n" error_msg += "3. No other program is using the port" return [types.TextContent( type="text", text=error_msg )] def close(self) -> None: """Close the serial port connection if open.""" if self.serial_port and self.serial_port.is_open: try: self.serial_port.close() logger.info(f"Closed serial port connection: {self.serial_port.port}") except Exception as e: logger.error(f"Error closing port: {str(e)}") self.serial_port = None

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mcp2everything/mcp2serial'

If you have feedback or need assistance with the MCP directory API, please join our Discord server