read_data
Read data from an open serial port in text or binary mode, specifying port, byte count, and timeout for UART device communication.
Instructions
从已打开的串口读取数据,支持文本和二进制模式
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| port | Yes | 串口路径,如 /dev/ttyUSB0 或 COM1 | |
| size | No | 读取字节数,不指定则读取所有可用数据 | |
| timeout_ms | No | 读取超时(毫秒),不指定则使用串口配置的超时 | |
| is_binary | No | 是否为二进制模式,True 时返回 Base64 编码 |
Implementation Reference
- src/uart_mcp/tools/data_ops.py:43-70 (handler)The primary handler function for the 'read_data' tool. It calls the serial manager to read raw bytes, converts to text or base64 based on is_binary flag, and returns a JSON-compatible dict.def read_data( port: str, size: int | None = None, timeout_ms: int | None = None, is_binary: bool = False, ) -> dict[str, Any]: """从串口读取数据 Args: port: 串口路径 size: 读取字节数,None 表示读取所有可用数据 timeout_ms: 读取超时(毫秒),None 使用串口配置的超时 is_binary: 是否为二进制模式 Returns: 读取结果,包含数据和字节数 """ manager = get_serial_manager() raw_data = manager.read_data(port, size, timeout_ms) # 解码转换 if is_binary: result_data = base64.b64encode(raw_data).decode("ascii") else: result_data = raw_data.decode("utf-8", errors="replace") return {"data": result_data, "bytes_read": len(raw_data)}
- The input schema definition for the 'read_data' tool, defining parameters like port, size, timeout_ms, and is_binary.READ_DATA_TOOL: dict[str, Any] = { "name": "read_data", "description": "从已打开的串口读取数据,支持文本和二进制模式", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径,如 /dev/ttyUSB0 或 COM1", }, "size": { "type": "integer", "description": "读取字节数,不指定则读取所有可用数据", }, "timeout_ms": { "type": "integer", "description": "读取超时(毫秒),不指定则使用串口配置的超时", }, "is_binary": { "type": "boolean", "description": "是否为二进制模式,True 时返回 Base64 编码", "default": False, }, }, "required": ["port"], }, }
- src/uart_mcp/server.py:97-101 (registration)Registration of the 'read_data' tool in the MCP server's list_tools handler, using the schema from data_ops.types.Tool( name=READ_DATA_TOOL["name"], description=READ_DATA_TOOL["description"], inputSchema=READ_DATA_TOOL["inputSchema"], ),
- src/uart_mcp/server.py:160-161 (registration)Dispatch logic in the MCP call_tool handler that invokes the read_data function when the tool 'read_data' is called.elif name == "read_data": result = read_data(**arguments)
- Low-level helper function in SerialManager that performs the actual serial port read operation using pyserial.def read_data( self, port: str, size: int | None = None, timeout_ms: int | None = None ) -> bytes: """读取原始字节数据 Args: port: 串口路径 size: 读取字节数,None 表示读取所有可用数据 timeout_ms: 读取超时(毫秒),None 使用串口配置的超时 Returns: 读取的字节数据 Raises: PortClosedError: 串口未打开 """ with self._lock: if port not in self._ports: raise PortClosedError(port) managed = self._ports[port] ser = managed.serial # 保存原始超时设置 original_timeout = ser.timeout try: # 如果指定了超时,临时修改 if timeout_ms is not None: ser.timeout = timeout_ms / 1000.0 data: bytes if size is not None: # 读取指定字节数 data = ser.read(size) else: # 读取所有可用数据 available: int = ser.in_waiting if available > 0: data = ser.read(available) else: # 没有可用数据,尝试读取一次(会等待超时) data = ser.read(1) if data: # 如果读到数据,继续读取剩余的 remaining: int = ser.in_waiting if remaining > 0: data += ser.read(remaining) logger.debug("从串口 %s 读取数据:%d 字节", port, len(data)) return data except SerialException as e: logger.error("串口读取失败:%s - %s", port, e) raise PortClosedError(port) from e finally: # 恢复原始超时设置 if timeout_ms is not None: ser.timeout = original_timeout