Skip to main content
Glama

read_data

Read data from an open serial port in text or binary mode, specifying port, byte count, and timeout for UART device communication.

Instructions

从已打开的串口读取数据,支持文本和二进制模式

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
portYes串口路径,如 /dev/ttyUSB0 或 COM1
sizeNo读取字节数,不指定则读取所有可用数据
timeout_msNo读取超时(毫秒),不指定则使用串口配置的超时
is_binaryNo是否为二进制模式,True 时返回 Base64 编码

Implementation Reference

  • The primary handler function for the 'read_data' tool. It calls the serial manager to read raw bytes, converts to text or base64 based on is_binary flag, and returns a JSON-compatible dict.
    def read_data( port: str, size: int | None = None, timeout_ms: int | None = None, is_binary: bool = False, ) -> dict[str, Any]: """从串口读取数据 Args: port: 串口路径 size: 读取字节数,None 表示读取所有可用数据 timeout_ms: 读取超时(毫秒),None 使用串口配置的超时 is_binary: 是否为二进制模式 Returns: 读取结果,包含数据和字节数 """ manager = get_serial_manager() raw_data = manager.read_data(port, size, timeout_ms) # 解码转换 if is_binary: result_data = base64.b64encode(raw_data).decode("ascii") else: result_data = raw_data.decode("utf-8", errors="replace") return {"data": result_data, "bytes_read": len(raw_data)}
  • The input schema definition for the 'read_data' tool, defining parameters like port, size, timeout_ms, and is_binary.
    READ_DATA_TOOL: dict[str, Any] = { "name": "read_data", "description": "从已打开的串口读取数据,支持文本和二进制模式", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径,如 /dev/ttyUSB0 或 COM1", }, "size": { "type": "integer", "description": "读取字节数,不指定则读取所有可用数据", }, "timeout_ms": { "type": "integer", "description": "读取超时(毫秒),不指定则使用串口配置的超时", }, "is_binary": { "type": "boolean", "description": "是否为二进制模式,True 时返回 Base64 编码", "default": False, }, }, "required": ["port"], }, }
  • Registration of the 'read_data' tool in the MCP server's list_tools handler, using the schema from data_ops.
    types.Tool( name=READ_DATA_TOOL["name"], description=READ_DATA_TOOL["description"], inputSchema=READ_DATA_TOOL["inputSchema"], ),
  • Dispatch logic in the MCP call_tool handler that invokes the read_data function when the tool 'read_data' is called.
    elif name == "read_data": result = read_data(**arguments)
  • Low-level helper function in SerialManager that performs the actual serial port read operation using pyserial.
    def read_data( self, port: str, size: int | None = None, timeout_ms: int | None = None ) -> bytes: """读取原始字节数据 Args: port: 串口路径 size: 读取字节数,None 表示读取所有可用数据 timeout_ms: 读取超时(毫秒),None 使用串口配置的超时 Returns: 读取的字节数据 Raises: PortClosedError: 串口未打开 """ with self._lock: if port not in self._ports: raise PortClosedError(port) managed = self._ports[port] ser = managed.serial # 保存原始超时设置 original_timeout = ser.timeout try: # 如果指定了超时,临时修改 if timeout_ms is not None: ser.timeout = timeout_ms / 1000.0 data: bytes if size is not None: # 读取指定字节数 data = ser.read(size) else: # 读取所有可用数据 available: int = ser.in_waiting if available > 0: data = ser.read(available) else: # 没有可用数据,尝试读取一次(会等待超时) data = ser.read(1) if data: # 如果读到数据,继续读取剩余的 remaining: int = ser.in_waiting if remaining > 0: data += ser.read(remaining) logger.debug("从串口 %s 读取数据:%d 字节", port, len(data)) return data except SerialException as e: logger.error("串口读取失败:%s - %s", port, e) raise PortClosedError(port) from e finally: # 恢复原始超时设置 if timeout_ms is not None: ser.timeout = original_timeout

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/donnel666/uart-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server