Skip to main content
Glama

open_port

Configure and establish a serial port connection with customizable baud rate, data bits, parity, stop bits, flow control, and timeout settings for UART communication.

Instructions

打开指定串口,支持自定义配置参数

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
portYes串口路径,如 /dev/ttyUSB0 或 COM1
baudrateNo波特率,支持的值:[300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 38400, 57600, 115200, 230400, 460800, 921600]
bytesizeNo数据位,支持的值:[5, 6, 7, 8]
parityNo校验位,支持的值:['N', 'E', 'O', 'M', 'S']N
stopbitsNo停止位,支持的值:[1.0, 1.5, 2.0]
flow_controlNo流控制,支持的值:['none', 'hardware', 'software']none
read_timeout_msNo读取超时(毫秒),范围 0-60000
write_timeout_msNo写入超时(毫秒),范围 0-60000
auto_reconnectNo是否启用自动重连

Implementation Reference

  • The main handler function for the 'open_port' MCP tool. It receives tool arguments, calls the SerialManager to open the port, and returns the status as a dictionary.
    def open_port(
        port: str,
        baudrate: int = DEFAULT_BAUDRATE,
        bytesize: int = DEFAULT_BYTESIZE,
        parity: str = DEFAULT_PARITY.value,
        stopbits: float = float(DEFAULT_STOPBITS.value),
        flow_control: str = DEFAULT_FLOW_CONTROL.value,
        read_timeout_ms: int = DEFAULT_TIMEOUT_MS,
        write_timeout_ms: int = DEFAULT_WRITE_TIMEOUT_MS,
        auto_reconnect: bool = True,
    ) -> dict[str, Any]:
        """打开串口
    
        Args:
            port: 串口路径(如 /dev/ttyUSB0 或 COM1)
            baudrate: 波特率,默认 9600
            bytesize: 数据位,默认 8
            parity: 校验位,默认 N(无校验)
            stopbits: 停止位,默认 1
            flow_control: 流控制,默认 none
            read_timeout_ms: 读取超时(毫秒),默认 1000
            write_timeout_ms: 写入超时(毫秒),默认 1000
            auto_reconnect: 是否启用自动重连,默认 True
    
        Returns:
            串口状态信息
        """
        manager = get_serial_manager()
        status = manager.open_port(
            port=port,
            baudrate=baudrate,
            bytesize=bytesize,
            parity=parity,
            stopbits=stopbits,
            flow_control=flow_control,
            read_timeout_ms=read_timeout_ms,
            write_timeout_ms=write_timeout_ms,
            auto_reconnect=auto_reconnect,
        )
        return status.to_dict()
  • The input schema and metadata definition for the 'open_port' tool, used for registration and validation.
    OPEN_PORT_TOOL: dict[str, Any] = {
        "name": "open_port",
        "description": "打开指定串口,支持自定义配置参数",
        "inputSchema": {
            "type": "object",
            "properties": {
                "port": {
                    "type": "string",
                    "description": "串口路径,如 /dev/ttyUSB0 或 COM1",
                },
                "baudrate": {
                    "type": "integer",
                    "description": f"波特率,支持的值:{list(SUPPORTED_BAUDRATES)}",
                    "default": DEFAULT_BAUDRATE,
                },
                "bytesize": {
                    "type": "integer",
                    "description": f"数据位,支持的值:{list(SUPPORTED_BYTESIZES)}",
                    "default": DEFAULT_BYTESIZE,
                },
                "parity": {
                    "type": "string",
                    "description": f"校验位,支持的值:{[p.value for p in Parity]}",
                    "default": DEFAULT_PARITY.value,
                },
                "stopbits": {
                    "type": "number",
                    "description": f"停止位,支持的值:{[s.value for s in StopBits]}",
                    "default": float(DEFAULT_STOPBITS.value),
                },
                "flow_control": {
                    "type": "string",
                    "description": f"流控制,支持的值:{[f.value for f in FlowControl]}",
                    "default": DEFAULT_FLOW_CONTROL.value,
                },
                "read_timeout_ms": {
                    "type": "integer",
                    "description": "读取超时(毫秒),范围 0-60000",
                    "default": DEFAULT_TIMEOUT_MS,
                },
                "write_timeout_ms": {
                    "type": "integer",
                    "description": "写入超时(毫秒),范围 0-60000",
                    "default": DEFAULT_WRITE_TIMEOUT_MS,
                },
                "auto_reconnect": {
                    "type": "boolean",
                    "description": "是否启用自动重连",
                    "default": True,
                },
            },
            "required": ["port"],
        },
    }
  • Tool registration in the MCP server's list_tools handler. The 'open_port' tool is included in the returned list of available tools.
    @server.list_tools()  # type: ignore[no-untyped-call, untyped-decorator]
    async def handle_list_tools() -> list[types.Tool]:
        """返回可用工具列表"""
        return [
            types.Tool(
                name=LIST_PORTS_TOOL["name"],
                description=LIST_PORTS_TOOL["description"],
                inputSchema=LIST_PORTS_TOOL["inputSchema"],
            ),
            types.Tool(
                name=OPEN_PORT_TOOL["name"],
                description=OPEN_PORT_TOOL["description"],
                inputSchema=OPEN_PORT_TOOL["inputSchema"],
            ),
            types.Tool(
                name=CLOSE_PORT_TOOL["name"],
                description=CLOSE_PORT_TOOL["description"],
                inputSchema=CLOSE_PORT_TOOL["inputSchema"],
            ),
            types.Tool(
                name=SET_CONFIG_TOOL["name"],
                description=SET_CONFIG_TOOL["description"],
                inputSchema=SET_CONFIG_TOOL["inputSchema"],
            ),
            types.Tool(
                name=GET_STATUS_TOOL["name"],
                description=GET_STATUS_TOOL["description"],
                inputSchema=GET_STATUS_TOOL["inputSchema"],
            ),
            types.Tool(
                name=SEND_DATA_TOOL["name"],
                description=SEND_DATA_TOOL["description"],
                inputSchema=SEND_DATA_TOOL["inputSchema"],
            ),
            types.Tool(
                name=READ_DATA_TOOL["name"],
                description=READ_DATA_TOOL["description"],
                inputSchema=READ_DATA_TOOL["inputSchema"],
            ),
            # 终端会话工具
            types.Tool(
                name=CREATE_SESSION_TOOL["name"],
                description=CREATE_SESSION_TOOL["description"],
                inputSchema=CREATE_SESSION_TOOL["inputSchema"],
            ),
            types.Tool(
                name=CLOSE_SESSION_TOOL["name"],
                description=CLOSE_SESSION_TOOL["description"],
                inputSchema=CLOSE_SESSION_TOOL["inputSchema"],
            ),
            types.Tool(
                name=SEND_COMMAND_TOOL["name"],
                description=SEND_COMMAND_TOOL["description"],
                inputSchema=SEND_COMMAND_TOOL["inputSchema"],
            ),
            types.Tool(
                name=READ_OUTPUT_TOOL["name"],
                description=READ_OUTPUT_TOOL["description"],
                inputSchema=READ_OUTPUT_TOOL["inputSchema"],
            ),
            types.Tool(
                name=LIST_SESSIONS_TOOL["name"],
                description=LIST_SESSIONS_TOOL["description"],
                inputSchema=LIST_SESSIONS_TOOL["inputSchema"],
            ),
            types.Tool(
                name=GET_SESSION_INFO_TOOL["name"],
                description=GET_SESSION_INFO_TOOL["description"],
                inputSchema=GET_SESSION_INFO_TOOL["inputSchema"],
            ),
            types.Tool(
                name=CLEAR_BUFFER_TOOL["name"],
                description=CLEAR_BUFFER_TOOL["description"],
                inputSchema=CLEAR_BUFFER_TOOL["inputSchema"],
            ),
        ]
  • The MCP server dispatcher for tool calls. Dispatches 'open_port' calls to the port_ops.open_port handler function.
    @server.call_tool()  # type: ignore[untyped-decorator]
    async def handle_call_tool(
        name: str, arguments: dict[str, Any]
    ) -> list[types.TextContent]:
        """处理工具调用"""
        try:
            result: Any
            if name == "list_ports":
                result = list_ports()
            elif name == "open_port":
                result = open_port(**arguments)
            elif name == "close_port":
  • Core implementation in SerialManager.open_port that actually opens the serial port using pyserial, handles errors, and manages the connection state.
    def open_port(
        self,
        port: str,
        baudrate: int | None = None,
        bytesize: int | None = None,
        parity: str | None = None,
        stopbits: float | None = None,
        flow_control: str | None = None,
        read_timeout_ms: int | None = None,
        write_timeout_ms: int | None = None,
        auto_reconnect: bool | None = None,
    ) -> PortStatus:
        """打开串口
    
        Args:
            port: 串口路径
            baudrate: 波特率(None 时使用配置默认值)
            bytesize: 数据位(None 时使用配置默认值)
            parity: 校验位(None 时使用配置默认值)
            stopbits: 停止位(None 时使用配置默认值)
            flow_control: 流控制(None 时使用配置默认值)
            read_timeout_ms: 读取超时(毫秒,None 时使用配置默认值)
            write_timeout_ms: 写入超时(毫秒,None 时使用配置默认值)
            auto_reconnect: 是否启用自动重连(None 时使用配置默认值)
    
        Returns:
            串口状态
    
        Raises:
            PortBlacklistedError: 串口在黑名单中
            PortNotFoundError: 串口不存在
            PortBusyError: 串口被占用
            InvalidParamError: 参数无效
        """
        # 检查黑名单
        blacklist = get_blacklist_manager()
        if blacklist.is_blacklisted(port):
            raise PortBlacklistedError(port)
    
        # 获取全局配置作为默认值
        global_config = get_config_manager().config
    
        # 使用用户参数或配置默认值
        final_baudrate = baudrate if baudrate is not None else global_config.baudrate
        final_bytesize = bytesize if bytesize is not None else global_config.bytesize
        final_parity = parity if parity is not None else global_config.parity
        final_stopbits = stopbits if stopbits is not None else global_config.stopbits
        # 流控:显式传入时优先使用传入值
        if flow_control is None:
            final_flow_control = (
                "software" if global_config.xonxoff else
                "hardware" if global_config.rtscts else
                "none"
            )
        else:
            final_flow_control = flow_control
        cc = global_config
        rt = read_timeout_ms
        wt = write_timeout_ms
        ar = auto_reconnect
        final_read_timeout = cc.read_timeout if rt is None else rt
        final_write_timeout = cc.write_timeout if wt is None else wt
        final_auto_reconnect = cc.auto_reconnect if ar is None else ar
    
        # 验证参数
        config = self._validate_and_create_config(
            final_baudrate,
            final_bytesize,
            final_parity,
            final_stopbits,
            final_flow_control,
            final_read_timeout,
            final_write_timeout,
        )
    
        with self._lock:
            # 检查是否已打开(幂等操作)
            if port in self._ports:
                managed = self._ports[port]
                logger.info("串口已打开,返回当前状态:%s", port)
                return PortStatus(
                    port=port,
                    is_open=True,
                    config=managed.config,
                    connected=managed.is_connected,
                    reconnecting=managed.reconnecting,
                )
    
            # 打开串口
            serial_obj = self._create_serial(port, config)
            managed = ManagedPort(port, serial_obj, config, final_auto_reconnect)
            self._ports[port] = managed
            logger.info("串口打开成功:%s", port)
    
            return PortStatus(
                port=port,
                is_open=True,
                config=config,
                connected=managed.is_connected,
                reconnecting=False,
            )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/donnel666/uart-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server