Skip to main content
Glama
port_ops.py7.73 kB
"""串口操作工具实现 提供打开、关闭、配置串口等功能。 """ from typing import Any from ..serial_manager import get_serial_manager from ..types import ( DEFAULT_BAUDRATE, DEFAULT_BYTESIZE, DEFAULT_FLOW_CONTROL, DEFAULT_PARITY, DEFAULT_STOPBITS, DEFAULT_TIMEOUT_MS, DEFAULT_WRITE_TIMEOUT_MS, SUPPORTED_BAUDRATES, SUPPORTED_BYTESIZES, FlowControl, Parity, StopBits, ) def open_port( port: str, baudrate: int = DEFAULT_BAUDRATE, bytesize: int = DEFAULT_BYTESIZE, parity: str = DEFAULT_PARITY.value, stopbits: float = float(DEFAULT_STOPBITS.value), flow_control: str = DEFAULT_FLOW_CONTROL.value, read_timeout_ms: int = DEFAULT_TIMEOUT_MS, write_timeout_ms: int = DEFAULT_WRITE_TIMEOUT_MS, auto_reconnect: bool = True, ) -> dict[str, Any]: """打开串口 Args: port: 串口路径(如 /dev/ttyUSB0 或 COM1) baudrate: 波特率,默认 9600 bytesize: 数据位,默认 8 parity: 校验位,默认 N(无校验) stopbits: 停止位,默认 1 flow_control: 流控制,默认 none read_timeout_ms: 读取超时(毫秒),默认 1000 write_timeout_ms: 写入超时(毫秒),默认 1000 auto_reconnect: 是否启用自动重连,默认 True Returns: 串口状态信息 """ manager = get_serial_manager() status = manager.open_port( port=port, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, flow_control=flow_control, read_timeout_ms=read_timeout_ms, write_timeout_ms=write_timeout_ms, auto_reconnect=auto_reconnect, ) return status.to_dict() def close_port(port: str) -> dict[str, Any]: """关闭串口 Args: port: 串口路径 Returns: 操作结果 """ manager = get_serial_manager() return manager.close_port(port) def set_config( port: str, baudrate: int | None = None, bytesize: int | None = None, parity: str | None = None, stopbits: float | None = None, flow_control: str | None = None, read_timeout_ms: int | None = None, write_timeout_ms: int | None = None, ) -> dict[str, Any]: """修改串口配置(热更新) Args: port: 串口路径 baudrate: 波特率(可选) bytesize: 数据位(可选) parity: 校验位(可选) stopbits: 停止位(可选) flow_control: 流控制(可选) read_timeout_ms: 读取超时(可选) write_timeout_ms: 写入超时(可选) Returns: 更新后的串口状态 """ manager = get_serial_manager() status = manager.set_config( port=port, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, flow_control=flow_control, read_timeout_ms=read_timeout_ms, write_timeout_ms=write_timeout_ms, ) return status.to_dict() def get_status(port: str) -> dict[str, Any]: """获取串口状态 Args: port: 串口路径 Returns: 串口状态信息 """ manager = get_serial_manager() status = manager.get_status(port) return status.to_dict() # 工具定义(用于 MCP 注册) OPEN_PORT_TOOL: dict[str, Any] = { "name": "open_port", "description": "打开指定串口,支持自定义配置参数", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径,如 /dev/ttyUSB0 或 COM1", }, "baudrate": { "type": "integer", "description": f"波特率,支持的值:{list(SUPPORTED_BAUDRATES)}", "default": DEFAULT_BAUDRATE, }, "bytesize": { "type": "integer", "description": f"数据位,支持的值:{list(SUPPORTED_BYTESIZES)}", "default": DEFAULT_BYTESIZE, }, "parity": { "type": "string", "description": f"校验位,支持的值:{[p.value for p in Parity]}", "default": DEFAULT_PARITY.value, }, "stopbits": { "type": "number", "description": f"停止位,支持的值:{[s.value for s in StopBits]}", "default": float(DEFAULT_STOPBITS.value), }, "flow_control": { "type": "string", "description": f"流控制,支持的值:{[f.value for f in FlowControl]}", "default": DEFAULT_FLOW_CONTROL.value, }, "read_timeout_ms": { "type": "integer", "description": "读取超时(毫秒),范围 0-60000", "default": DEFAULT_TIMEOUT_MS, }, "write_timeout_ms": { "type": "integer", "description": "写入超时(毫秒),范围 0-60000", "default": DEFAULT_WRITE_TIMEOUT_MS, }, "auto_reconnect": { "type": "boolean", "description": "是否启用自动重连", "default": True, }, }, "required": ["port"], }, } CLOSE_PORT_TOOL: dict[str, Any] = { "name": "close_port", "description": "关闭指定串口连接", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径", }, }, "required": ["port"], }, } SET_CONFIG_TOOL: dict[str, Any] = { "name": "set_config", "description": "修改已打开串口的配置(热更新,无需关闭重开)", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径", }, "baudrate": { "type": "integer", "description": f"波特率,支持的值:{list(SUPPORTED_BAUDRATES)}", }, "bytesize": { "type": "integer", "description": f"数据位,支持的值:{list(SUPPORTED_BYTESIZES)}", }, "parity": { "type": "string", "description": f"校验位,支持的值:{[p.value for p in Parity]}", }, "stopbits": { "type": "number", "description": f"停止位,支持的值:{[s.value for s in StopBits]}", }, "flow_control": { "type": "string", "description": f"流控制,支持的值:{[f.value for f in FlowControl]}", }, "read_timeout_ms": { "type": "integer", "description": "读取超时(毫秒),范围 0-60000", }, "write_timeout_ms": { "type": "integer", "description": "写入超时(毫秒),范围 0-60000", }, }, "required": ["port"], }, } GET_STATUS_TOOL: dict[str, Any] = { "name": "get_status", "description": "获取已打开串口的当前状态和配置信息", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径", }, }, "required": ["port"], }, }

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/donnel666/uart-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server