Skip to main content
Glama

open_port

Configure and establish a serial port connection with customizable baud rate, data bits, parity, stop bits, flow control, and timeout settings for UART communication.

Instructions

打开指定串口,支持自定义配置参数

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
portYes串口路径,如 /dev/ttyUSB0 或 COM1
baudrateNo波特率,支持的值:[300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 38400, 57600, 115200, 230400, 460800, 921600]
bytesizeNo数据位,支持的值:[5, 6, 7, 8]
parityNo校验位,支持的值:['N', 'E', 'O', 'M', 'S']N
stopbitsNo停止位,支持的值:[1.0, 1.5, 2.0]
flow_controlNo流控制,支持的值:['none', 'hardware', 'software']none
read_timeout_msNo读取超时(毫秒),范围 0-60000
write_timeout_msNo写入超时(毫秒),范围 0-60000
auto_reconnectNo是否启用自动重连

Implementation Reference

  • The main handler function for the 'open_port' MCP tool. It receives tool arguments, calls the SerialManager to open the port, and returns the status as a dictionary.
    def open_port( port: str, baudrate: int = DEFAULT_BAUDRATE, bytesize: int = DEFAULT_BYTESIZE, parity: str = DEFAULT_PARITY.value, stopbits: float = float(DEFAULT_STOPBITS.value), flow_control: str = DEFAULT_FLOW_CONTROL.value, read_timeout_ms: int = DEFAULT_TIMEOUT_MS, write_timeout_ms: int = DEFAULT_WRITE_TIMEOUT_MS, auto_reconnect: bool = True, ) -> dict[str, Any]: """打开串口 Args: port: 串口路径(如 /dev/ttyUSB0 或 COM1) baudrate: 波特率,默认 9600 bytesize: 数据位,默认 8 parity: 校验位,默认 N(无校验) stopbits: 停止位,默认 1 flow_control: 流控制,默认 none read_timeout_ms: 读取超时(毫秒),默认 1000 write_timeout_ms: 写入超时(毫秒),默认 1000 auto_reconnect: 是否启用自动重连,默认 True Returns: 串口状态信息 """ manager = get_serial_manager() status = manager.open_port( port=port, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, flow_control=flow_control, read_timeout_ms=read_timeout_ms, write_timeout_ms=write_timeout_ms, auto_reconnect=auto_reconnect, ) return status.to_dict()
  • The input schema and metadata definition for the 'open_port' tool, used for registration and validation.
    OPEN_PORT_TOOL: dict[str, Any] = { "name": "open_port", "description": "打开指定串口,支持自定义配置参数", "inputSchema": { "type": "object", "properties": { "port": { "type": "string", "description": "串口路径,如 /dev/ttyUSB0 或 COM1", }, "baudrate": { "type": "integer", "description": f"波特率,支持的值:{list(SUPPORTED_BAUDRATES)}", "default": DEFAULT_BAUDRATE, }, "bytesize": { "type": "integer", "description": f"数据位,支持的值:{list(SUPPORTED_BYTESIZES)}", "default": DEFAULT_BYTESIZE, }, "parity": { "type": "string", "description": f"校验位,支持的值:{[p.value for p in Parity]}", "default": DEFAULT_PARITY.value, }, "stopbits": { "type": "number", "description": f"停止位,支持的值:{[s.value for s in StopBits]}", "default": float(DEFAULT_STOPBITS.value), }, "flow_control": { "type": "string", "description": f"流控制,支持的值:{[f.value for f in FlowControl]}", "default": DEFAULT_FLOW_CONTROL.value, }, "read_timeout_ms": { "type": "integer", "description": "读取超时(毫秒),范围 0-60000", "default": DEFAULT_TIMEOUT_MS, }, "write_timeout_ms": { "type": "integer", "description": "写入超时(毫秒),范围 0-60000", "default": DEFAULT_WRITE_TIMEOUT_MS, }, "auto_reconnect": { "type": "boolean", "description": "是否启用自动重连", "default": True, }, }, "required": ["port"], }, }
  • Tool registration in the MCP server's list_tools handler. The 'open_port' tool is included in the returned list of available tools.
    @server.list_tools() # type: ignore[no-untyped-call, untyped-decorator] async def handle_list_tools() -> list[types.Tool]: """返回可用工具列表""" return [ types.Tool( name=LIST_PORTS_TOOL["name"], description=LIST_PORTS_TOOL["description"], inputSchema=LIST_PORTS_TOOL["inputSchema"], ), types.Tool( name=OPEN_PORT_TOOL["name"], description=OPEN_PORT_TOOL["description"], inputSchema=OPEN_PORT_TOOL["inputSchema"], ), types.Tool( name=CLOSE_PORT_TOOL["name"], description=CLOSE_PORT_TOOL["description"], inputSchema=CLOSE_PORT_TOOL["inputSchema"], ), types.Tool( name=SET_CONFIG_TOOL["name"], description=SET_CONFIG_TOOL["description"], inputSchema=SET_CONFIG_TOOL["inputSchema"], ), types.Tool( name=GET_STATUS_TOOL["name"], description=GET_STATUS_TOOL["description"], inputSchema=GET_STATUS_TOOL["inputSchema"], ), types.Tool( name=SEND_DATA_TOOL["name"], description=SEND_DATA_TOOL["description"], inputSchema=SEND_DATA_TOOL["inputSchema"], ), types.Tool( name=READ_DATA_TOOL["name"], description=READ_DATA_TOOL["description"], inputSchema=READ_DATA_TOOL["inputSchema"], ), # 终端会话工具 types.Tool( name=CREATE_SESSION_TOOL["name"], description=CREATE_SESSION_TOOL["description"], inputSchema=CREATE_SESSION_TOOL["inputSchema"], ), types.Tool( name=CLOSE_SESSION_TOOL["name"], description=CLOSE_SESSION_TOOL["description"], inputSchema=CLOSE_SESSION_TOOL["inputSchema"], ), types.Tool( name=SEND_COMMAND_TOOL["name"], description=SEND_COMMAND_TOOL["description"], inputSchema=SEND_COMMAND_TOOL["inputSchema"], ), types.Tool( name=READ_OUTPUT_TOOL["name"], description=READ_OUTPUT_TOOL["description"], inputSchema=READ_OUTPUT_TOOL["inputSchema"], ), types.Tool( name=LIST_SESSIONS_TOOL["name"], description=LIST_SESSIONS_TOOL["description"], inputSchema=LIST_SESSIONS_TOOL["inputSchema"], ), types.Tool( name=GET_SESSION_INFO_TOOL["name"], description=GET_SESSION_INFO_TOOL["description"], inputSchema=GET_SESSION_INFO_TOOL["inputSchema"], ), types.Tool( name=CLEAR_BUFFER_TOOL["name"], description=CLEAR_BUFFER_TOOL["description"], inputSchema=CLEAR_BUFFER_TOOL["inputSchema"], ), ]
  • The MCP server dispatcher for tool calls. Dispatches 'open_port' calls to the port_ops.open_port handler function.
    @server.call_tool() # type: ignore[untyped-decorator] async def handle_call_tool( name: str, arguments: dict[str, Any] ) -> list[types.TextContent]: """处理工具调用""" try: result: Any if name == "list_ports": result = list_ports() elif name == "open_port": result = open_port(**arguments) elif name == "close_port":
  • Core implementation in SerialManager.open_port that actually opens the serial port using pyserial, handles errors, and manages the connection state.
    def open_port( self, port: str, baudrate: int | None = None, bytesize: int | None = None, parity: str | None = None, stopbits: float | None = None, flow_control: str | None = None, read_timeout_ms: int | None = None, write_timeout_ms: int | None = None, auto_reconnect: bool | None = None, ) -> PortStatus: """打开串口 Args: port: 串口路径 baudrate: 波特率(None 时使用配置默认值) bytesize: 数据位(None 时使用配置默认值) parity: 校验位(None 时使用配置默认值) stopbits: 停止位(None 时使用配置默认值) flow_control: 流控制(None 时使用配置默认值) read_timeout_ms: 读取超时(毫秒,None 时使用配置默认值) write_timeout_ms: 写入超时(毫秒,None 时使用配置默认值) auto_reconnect: 是否启用自动重连(None 时使用配置默认值) Returns: 串口状态 Raises: PortBlacklistedError: 串口在黑名单中 PortNotFoundError: 串口不存在 PortBusyError: 串口被占用 InvalidParamError: 参数无效 """ # 检查黑名单 blacklist = get_blacklist_manager() if blacklist.is_blacklisted(port): raise PortBlacklistedError(port) # 获取全局配置作为默认值 global_config = get_config_manager().config # 使用用户参数或配置默认值 final_baudrate = baudrate if baudrate is not None else global_config.baudrate final_bytesize = bytesize if bytesize is not None else global_config.bytesize final_parity = parity if parity is not None else global_config.parity final_stopbits = stopbits if stopbits is not None else global_config.stopbits # 流控:显式传入时优先使用传入值 if flow_control is None: final_flow_control = ( "software" if global_config.xonxoff else "hardware" if global_config.rtscts else "none" ) else: final_flow_control = flow_control cc = global_config rt = read_timeout_ms wt = write_timeout_ms ar = auto_reconnect final_read_timeout = cc.read_timeout if rt is None else rt final_write_timeout = cc.write_timeout if wt is None else wt final_auto_reconnect = cc.auto_reconnect if ar is None else ar # 验证参数 config = self._validate_and_create_config( final_baudrate, final_bytesize, final_parity, final_stopbits, final_flow_control, final_read_timeout, final_write_timeout, ) with self._lock: # 检查是否已打开(幂等操作) if port in self._ports: managed = self._ports[port] logger.info("串口已打开,返回当前状态:%s", port) return PortStatus( port=port, is_open=True, config=managed.config, connected=managed.is_connected, reconnecting=managed.reconnecting, ) # 打开串口 serial_obj = self._create_serial(port, config) managed = ManagedPort(port, serial_obj, config, final_auto_reconnect) self._ports[port] = managed logger.info("串口打开成功:%s", port) return PortStatus( port=port, is_open=True, config=config, connected=managed.is_connected, reconnecting=False, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/donnel666/uart-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server