Skip to main content
Glama
data_ops.py3.62 kB
"""数据通信工具实现 提供串口数据收发功能,支持文本模式和二进制模式。 """ import base64 from typing import Any from ..errors import InvalidParamError from ..serial_manager import get_serial_manager def send_data( port: str, data: str, is_binary: bool = False, ) -> dict[str, Any]: """发送数据到串口 Args: port: 串口路径 data: 要发送的数据(文本模式为 UTF-8 字符串,二进制模式为 Base64 编码) is_binary: 是否为二进制模式 Returns: 发送结果,包含发送的字节数 """ manager = get_serial_manager() # 编码转换 if is_binary: try: raw_data = base64.b64decode(data) except Exception as e: raise InvalidParamError("data", data, f"Base64 解码失败:{e}") else: raw_data = data.encode("utf-8") bytes_written = manager.send_data(port, raw_data) return {"success": True, "bytes_written": bytes_written} def read_data( port: str, size: int | None = None, timeout_ms: int | None = None, is_binary: bool = False, ) -> dict[str, Any]: """从串口读取数据 Args: port: 串口路径 size: 读取字节数,None 表示读取所有可用数据 timeout_ms: 读取超时(毫秒),None 使用串口配置的超时 is_binary: 是否为二进制模式 Returns: 读取结果,包含数据和字节数 """ manager = get_serial_manager() raw_data = manager.read_data(port, size, timeout_ms) # 解码转换 if is_binary: result_data = base64.b64encode(raw_data).decode("ascii") else: result_data = raw_data.decode("utf-8", errors="replace") return {"data": result_data, "bytes_read": len(raw_data)} # 工具定义(用于 MCP 注册) SEND_DATA_TOOL: dict[str, Any] = { "name": "send_data", "description": "向已打开的串口发送数据,支持文本和二进制模式", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径,如 /dev/ttyUSB0 或 COM1", }, "data": { "type": "string", "description": "要发送的数据(文本为UTF-8,二进制为Base64)", }, "is_binary": { "type": "boolean", "description": "是否为二进制模式,True 时 data 为 Base64 编码", "default": False, }, }, "required": ["port", "data"], }, } READ_DATA_TOOL: dict[str, Any] = { "name": "read_data", "description": "从已打开的串口读取数据,支持文本和二进制模式", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径,如 /dev/ttyUSB0 或 COM1", }, "size": { "type": "integer", "description": "读取字节数,不指定则读取所有可用数据", }, "timeout_ms": { "type": "integer", "description": "读取超时(毫秒),不指定则使用串口配置的超时", }, "is_binary": { "type": "boolean", "description": "是否为二进制模式,True 时返回 Base64 编码", "default": False, }, }, "required": ["port"], }, }

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/donnel666/uart-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server