read_data
Read data from a serial port in text or binary mode. Specify the port, optional byte size, timeout, and binary mode for Base64 output.
Instructions
从已打开的串口读取数据,支持文本和二进制模式
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| port | Yes | 串口路径,如 /dev/ttyUSB0 或 COM1 | |
| size | No | 读取字节数,不指定则读取所有可用数据 | |
| timeout_ms | No | 读取超时(毫秒),不指定则使用串口配置的超时 | |
| is_binary | No | 是否为二进制模式,True 时返回 Base64 编码 |
Implementation Reference
- src/uart_mcp/tools/data_ops.py:43-69 (handler)The read_data handler function: the MCP tool handler that reads data from a serial port. It gets the serial manager, calls manager.read_data(), then decodes the raw bytes (Base64 for binary mode, UTF-8 for text mode) and returns the result.
def read_data( port: str, size: int | None = None, timeout_ms: int | None = None, is_binary: bool = False, ) -> dict[str, Any]: """从串口读取数据 Args: port: 串口路径 size: 读取字节数,None 表示读取所有可用数据 timeout_ms: 读取超时(毫秒),None 使用串口配置的超时 is_binary: 是否为二进制模式 Returns: 读取结果,包含数据和字节数 """ manager = get_serial_manager() raw_data = manager.read_data(port, size, timeout_ms) # 解码转换 if is_binary: result_data = base64.b64encode(raw_data).decode("ascii") else: result_data = raw_data.decode("utf-8", errors="replace") return {"data": result_data, "bytes_read": len(raw_data)} - The READ_DATA_TOOL schema definition: defines the MCP tool metadata including name='read_data', description, and inputSchema with properties for port (required), size, timeout_ms, and is_binary.
READ_DATA_TOOL: dict[str, Any] = { "name": "read_data", "description": "从已打开的串口读取数据,支持文本和二进制模式", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径,如 /dev/ttyUSB0 或 COM1", }, "size": { "type": "integer", "description": "读取字节数,不指定则读取所有可用数据", }, "timeout_ms": { "type": "integer", "description": "读取超时(毫秒),不指定则使用串口配置的超时", }, "is_binary": { "type": "boolean", "description": "是否为二进制模式,True 时返回 Base64 编码", "default": False, }, }, "required": ["port"], }, } - src/uart_mcp/server.py:97-138 (registration)Tool registration via handle_list_tools: registers read_data in the MCP tool list using the READ_DATA_TOOL constant's name, description, and inputSchema.
types.Tool( name=READ_DATA_TOOL["name"], description=READ_DATA_TOOL["description"], inputSchema=READ_DATA_TOOL["inputSchema"], ), # 终端会话工具 types.Tool( name=CREATE_SESSION_TOOL["name"], description=CREATE_SESSION_TOOL["description"], inputSchema=CREATE_SESSION_TOOL["inputSchema"], ), types.Tool( name=CLOSE_SESSION_TOOL["name"], description=CLOSE_SESSION_TOOL["description"], inputSchema=CLOSE_SESSION_TOOL["inputSchema"], ), types.Tool( name=SEND_COMMAND_TOOL["name"], description=SEND_COMMAND_TOOL["description"], inputSchema=SEND_COMMAND_TOOL["inputSchema"], ), types.Tool( name=READ_OUTPUT_TOOL["name"], description=READ_OUTPUT_TOOL["description"], inputSchema=READ_OUTPUT_TOOL["inputSchema"], ), types.Tool( name=LIST_SESSIONS_TOOL["name"], description=LIST_SESSIONS_TOOL["description"], inputSchema=LIST_SESSIONS_TOOL["inputSchema"], ), types.Tool( name=GET_SESSION_INFO_TOOL["name"], description=GET_SESSION_INFO_TOOL["description"], inputSchema=GET_SESSION_INFO_TOOL["inputSchema"], ), types.Tool( name=CLEAR_BUFFER_TOOL["name"], description=CLEAR_BUFFER_TOOL["description"], inputSchema=CLEAR_BUFFER_TOOL["inputSchema"], ), ] - src/uart_mcp/server.py:160-162 (registration)Tool dispatch in handle_call_tool: routes the 'read_data' name to call the read_data() handler function with the provided arguments.
elif name == "read_data": result = read_data(**arguments) # 终端会话工具 - The SerialManager.read_data helper: performs the low-level serial read. Reads a specified number of bytes or all available data from the serial port, with optional timeout.
def read_data( self, port: str, size: int | None = None, timeout_ms: int | None = None ) -> bytes: """读取原始字节数据 Args: port: 串口路径 size: 读取字节数,None 表示读取所有可用数据 timeout_ms: 读取超时(毫秒),None 使用串口配置的超时 Returns: 读取的字节数据 Raises: PortClosedError: 串口未打开 """ with self._lock: if port not in self._ports: raise PortClosedError(port) managed = self._ports[port] ser = managed.serial # 保存原始超时设置 original_timeout = ser.timeout try: # 如果指定了超时,临时修改 if timeout_ms is not None: ser.timeout = timeout_ms / 1000.0 data: bytes if size is not None: # 读取指定字节数 data = ser.read(size) else: # 读取所有可用数据 available: int = ser.in_waiting if available > 0: data = ser.read(available) else: # 没有可用数据,尝试读取一次(会等待超时) data = ser.read(1) if data: # 如果读到数据,继续读取剩余的 remaining: int = ser.in_waiting if remaining > 0: data += ser.read(remaining) logger.debug("从串口 %s 读取数据:%d 字节", port, len(data)) return data except SerialException as e: logger.error("串口读取失败:%s - %s", port, e) raise PortClosedError(port) from e finally: # 恢复原始超时设置 if timeout_ms is not None: ser.timeout = original_timeout