Skip to main content
Glama
refactor-gui-to-web.md25.2 kB
# GUI 重构方案:pywebview → HTTP + SSE > **版本**: v2.0 (基于 Codex Review 更新) > **日期**: 2024-12-18 ## 背景 ### 问题 1. **WSL2 兼容性**:pywebview 在 WSL2 下需要 WSLg 或 X Server,配置复杂 2. **跨平台限制**:pywebview 依赖平台特定的 WebView 组件 3. **调试困难**:pywebview 窗口无法使用浏览器 DevTools 4. **单客户端限制**:当前架构只支持一个 GUI 窗口 ### 目标 1. 保持现有功能不变 2. 支持 WSL2 等无 GUI 环境 3. 支持多客户端并行访问(GUI + 浏览器) 4. pywebview 不可用时自动降级为纯 Web 模式 5. 最小化代码改动 ## 架构设计 ### 现有架构 ``` ┌─────────────────────────────────────────────────────────────┐ │ 现有架构 │ │ │ │ 主进程 (MCP Server) │ │ │ │ │ └── GUIManager │ │ │ │ │ └── multiprocessing.Process (子进程) │ │ │ │ │ └── LiveViewer │ │ ├── pywebview 窗口 │ │ ├── _poll_queue_loop 线程 │ │ ├── mp.Queue 事件队列 │ │ └── evaluate_js("addEvent()") │ │ │ └─────────────────────────────────────────────────────────────┘ ``` ### 新架构 ``` ┌─────────────────────────────────────────────────────────────┐ │ 新架构 │ │ │ │ 主进程 (MCP Server) │ │ │ │ │ └── GUIManager │ │ │ │ │ └── multiprocessing.Process (子进程) │ │ │ │ │ └── LiveViewer │ │ │ │ │ ├── GUIServer (HTTP + SSE) │ │ │ ├── GET / → HTML │ │ │ ├── GET /sse → 事件流 │ │ │ └── clients[] 连接池 │ │ │ │ │ ├── _poll_queue_loop 线程 │ │ │ └── server.broadcast() │ │ │ │ │ └── 客户端连接 │ │ ├── pywebview (可选) │ │ ├── Chrome 浏览器 │ │ └── 其他浏览器 │ │ │ └─────────────────────────────────────────────────────────────┘ ``` ## 核心改动 ### 1. 新增 GUIServer 类 **文件**: `src/cli_agent_mcp/shared/gui/server.py` (新建) ```python """内置 HTTP + SSE 服务器""" import http.server import json import logging import queue import socketserver import threading import time from dataclasses import dataclass from typing import Callable logger = logging.getLogger(__name__) @dataclass class ServerConfig: """服务器配置""" host: str = "127.0.0.1" port: int = 0 # 0 = 随机端口 grace_period: float = 2.0 # 宽限期(秒) max_clients: int = 10 # 最大客户端数 class GUIServer: """HTTP 服务器,提供静态 HTML 和 SSE 事件流""" def __init__(self, html: str, config: ServerConfig | None = None): self.html = html self.config = config or ServerConfig() self._clients: list[queue.Queue] = [] self._lock = threading.Lock() self._shutdown_callback: Callable[[], None] | None = None self._server: socketserver.TCPServer | None = None self._actual_port: int = 0 @property def port(self) -> int: """实际绑定的端口""" return self._actual_port @property def url(self) -> str: """服务器 URL""" return f"http://{self.config.host}:{self._actual_port}" def on_all_disconnected(self, callback: Callable[[], None]): """注册所有客户端断开时的回调""" self._shutdown_callback = callback def start(self) -> int: """启动服务器,返回实际端口""" handler = self._create_handler() # 直接绑定,让系统分配端口(避免竞态) self._server = socketserver.ThreadingTCPServer( (self.config.host, self.config.port), handler ) self._server.allow_reuse_address = True # 从 server 获取实际端口 self._actual_port = self._server.server_address[1] thread = threading.Thread( target=self._server.serve_forever, daemon=True, name="gui_http_server" ) thread.start() logger.info(f"GUI server started at {self.url}") return self._actual_port def stop(self): """停止服务器""" if self._server: self._server.shutdown() self._server.server_close() logger.debug("GUI server stopped") def broadcast(self, event: dict): """广播事件到所有 SSE 客户端""" with self._lock: for client_q in self._clients[:]: # 复制列表避免并发修改 try: client_q.put_nowait(event) except queue.Full: logger.debug("Client queue full, dropping event") @property def client_count(self) -> int: """当前连接的客户端数量""" with self._lock: return len(self._clients) def _client_connected(self, client_q: queue.Queue) -> bool: """客户端连接,返回是否允许""" with self._lock: if len(self._clients) >= self.config.max_clients: logger.warning(f"Max clients ({self.config.max_clients}) reached") return False self._clients.append(client_q) logger.debug(f"Client connected, total: {len(self._clients)}") return True def _client_disconnected(self, client_q: queue.Queue): """客户端断开""" with self._lock: if client_q in self._clients: self._clients.remove(client_q) remaining = len(self._clients) logger.debug(f"Client disconnected, remaining: {remaining}") if remaining == 0: # 启动宽限期检查 threading.Thread( target=self._check_shutdown_after_grace, daemon=True, name="gui_grace_check" ).start() def _check_shutdown_after_grace(self): """宽限期后检查是否需要退出""" time.sleep(self.config.grace_period) with self._lock: if not self._clients and self._shutdown_callback: logger.info("All clients disconnected after grace period") self._shutdown_callback() def _create_handler(self): server = self class Handler(http.server.BaseHTTPRequestHandler): protocol_version = 'HTTP/1.1' def do_GET(self): if self.path == '/': self._serve_html() elif self.path == '/sse': self._serve_sse() else: self.send_error(404) def _serve_html(self): content = server.html.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) def _serve_sse(self): client_q: queue.Queue = queue.Queue(maxsize=500) # 检查是否允许连接 if not server._client_connected(client_q): self.send_error(503, "Too many clients") return self.send_response(200) self.send_header('Content-Type', 'text/event-stream') self.send_header('Cache-Control', 'no-cache') self.send_header('Connection', 'keep-alive') self.send_header('X-Accel-Buffering', 'no') self.end_headers() try: while True: try: event = client_q.get(timeout=25) data = json.dumps(event, ensure_ascii=False) self.wfile.write(f"data: {data}\n\n".encode('utf-8')) self.wfile.flush() except queue.Empty: # 心跳保活 self.wfile.write(b": ping\n\n") self.wfile.flush() except (BrokenPipeError, ConnectionResetError, OSError, TimeoutError): pass # 客户端断开 finally: server._client_disconnected(client_q) def log_message(self, format, *args): pass # 静默日志 return Handler ``` ### 2. 修改 template.py **文件**: `src/cli_agent_mcp/shared/gui/template.py` 添加 SSE 客户端代码和 `updateStatus` 函数: ```javascript // 添加到 <script> 标签末尾 // ========== updateStatus 函数 ========== function updateStatus(status) { const statusBar = document.getElementById('status-bar'); if (!statusBar) return; let parts = []; if (status.model) parts.push(`Model: ${status.model}`); if (status.session) parts.push(`Session: ${status.session.slice(0, 8)}...`); if (status.tokens) parts.push(`Tokens: ${status.tokens}`); if (status.duration) parts.push(`Duration: ${status.duration.toFixed(1)}s`); if (status.tools) parts.push(`Tools: ${status.tools}`); if (status.streaming) parts.push('⏳ Streaming...'); statusBar.textContent = parts.join(' | ') || 'Ready'; } // ========== SSE 客户端 ========== (function() { let evtSource = null; let reconnectAttempts = 0; const maxReconnectAttempts = 10; function connect() { evtSource = new EventSource('/sse'); evtSource.onopen = function() { console.log('SSE connected'); reconnectAttempts = 0; }; evtSource.onmessage = function(e) { try { const data = JSON.parse(e.data); if (data.type === 'event') { addEvent(data.html, data.session, data.source, data.task_note); } else if (data.type === 'status') { updateStatus(data.status); } } catch (err) { console.error('SSE parse error:', err); } }; evtSource.onerror = function() { console.log('SSE connection lost'); evtSource.close(); if (reconnectAttempts < maxReconnectAttempts) { reconnectAttempts++; const delay = Math.min(1000 * reconnectAttempts, 10000); console.log(`Reconnecting in ${delay}ms (attempt ${reconnectAttempts})`); setTimeout(connect, delay); } }; } connect(); // 页面关闭时清理 window.addEventListener('beforeunload', function() { if (evtSource) evtSource.close(); }); })(); ``` ### 3. 修改 window.py **文件**: `src/cli_agent_mcp/shared/gui/window.py` ```python # 主要改动点 import os import sys class LiveViewer: def __init__(self, ...): # ... 现有代码 ... self._server: GUIServer | None = None self._url: str | None = None self._webview_available: bool = False @property def url(self) -> str | None: """获取 GUI URL""" return self._url @property def is_webview_mode(self) -> bool: """是否使用 pywebview 模式""" return self._webview_available and self._window is not None def _check_gui_available(self) -> bool: """检查 GUI 环境是否可用(Linux/WSL2 特殊处理)""" if sys.platform == "linux": # 检查 DISPLAY 或 WAYLAND_DISPLAY if not os.environ.get("DISPLAY") and not os.environ.get("WAYLAND_DISPLAY"): logger.info("No DISPLAY/WAYLAND_DISPLAY, GUI not available") return False return True def _run(self) -> None: """运行主循环""" html = generate_html( multi_source_mode=self.config.multi_source_mode, title=self.config.title, ) # 启动 HTTP 服务器 from .server import GUIServer, ServerConfig server_config = ServerConfig( host=os.environ.get("CAM_GUI_HOST", "127.0.0.1"), port=int(os.environ.get("CAM_GUI_PORT", "0")), ) self._server = GUIServer(html, server_config) self._server.start() self._url = self._server.url # 注册断开回调 self._server.on_all_disconnected(self._on_all_clients_disconnected) # 打印 URL logger.info(f"GUI available at: {self._url}") # 检查 GUI 环境 if not self._check_gui_available(): self._start_web_only_mode() return # 尝试启动 pywebview try: import webview self._window = webview.create_window( self.config.title, url=self._url, # 改为加载 HTTP URL width=self.config.width, height=self.config.height, min_size=(600, 400), ) def on_loaded(): self._started.set() self._webview_available = True self._poll_thread = threading.Thread( target=self._poll_queue_loop, daemon=True ) self._poll_thread.start() def on_closed(): self._closed.set() self._queue.put(None) self._window.events.loaded += on_loaded self._window.events.closed += on_closed webview.start() except ImportError: logger.warning(f"pywebview not installed, use browser: {self._url}") self._start_web_only_mode() except Exception as e: # 捕获所有运行时异常(如 GTK/Qt 不可用) logger.warning(f"pywebview failed ({e}), falling back to browser: {self._url}") self._start_web_only_mode() def _start_web_only_mode(self): """启动纯 Web 模式""" self._webview_available = False self._started.set() # 启动轮询线程 self._poll_thread = threading.Thread( target=self._poll_queue_loop, daemon=True ) self._poll_thread.start() # 等待关闭信号 self._closed.wait() def _poll_queue_loop(self) -> None: """后台轮询线程 - 不再依赖 self._window""" poll_interval = self.config.poll_interval_ms / 1000 # 修改条件:只依赖 _closed 和 _server while not self._closed.is_set() and self._server is not None: try: events_processed = 0 while events_processed < 100: try: event = self._queue.get_nowait() if event is None: return self._render_event(event) events_processed += 1 except queue.Empty: break except Exception as e: logger.debug(f"Poll loop error: {e}") time.sleep(poll_interval) def _render_event(self, event: dict) -> None: """渲染事件 - 通过 SSE 广播""" if self._server is None: return try: html = self._renderer.render(event) session_id = self._extract_session_id(event) source = event.get("source", "unknown") task_note = event.get("metadata", {}).get("task_note", "") or event.get("task_note", "") # 通过 SSE 广播 self._server.broadcast({ 'type': 'event', 'html': html, 'session': session_id, 'source': source, 'task_note': task_note, }) # 更新状态 self._update_stats(event) except Exception as e: logger.warning(f"Render error: {e}") def _update_stats(self, event: dict) -> None: """更新状态栏 - 通过 SSE 广播""" # 不再依赖 self._window updated = False # Model if event.get("model"): self._stats["model"] = event["model"] updated = True # Session session_id = self._extract_session_id(event) if session_id: self._stats["session"] = session_id updated = True # Stats from lifecycle events stats = event.get("stats", {}) if stats: if stats.get("total_tokens"): self._stats["tokens"] = stats["total_tokens"] updated = True elif stats.get("input_tokens") or stats.get("output_tokens"): self._stats["tokens"] = ( stats.get("input_tokens", 0) + stats.get("output_tokens", 0) ) updated = True if stats.get("duration_ms"): self._stats["duration"] = stats["duration_ms"] / 1000 updated = True if stats.get("tool_calls"): self._stats["tools"] = stats["tool_calls"] updated = True # Tool count from operations if event.get("category") == "operation" and event.get("status") == "running": self._stats["tools"] = self._stats.get("tools", 0) + 1 updated = True # Streaming indicator is_streaming = event.get("is_delta", False) or event.get("status") == "running" if updated and self._server: status_data = { "model": self._stats.get("model"), "session": self._stats.get("session"), "tokens": self._stats.get("tokens", 0), "duration": self._stats.get("duration", 0.0), "tools": self._stats.get("tools", 0), "streaming": is_streaming, } self._server.broadcast({ 'type': 'status', 'status': status_data, }) def _on_all_clients_disconnected(self): """所有客户端断开时的回调""" logger.info("All clients disconnected") self._closed.set() def close(self) -> None: """关闭查看器窗口""" self._closed.set() # 关闭 pywebview 窗口 if self._window: try: self._window.destroy() except Exception: pass # 停止 HTTP 服务器 if self._server: self._server.stop() ``` ## 配置变更 ### 现有环境变量(保持不变) | 变量 | 默认值 | 说明 | |------|--------|------| | `CAM_GUI` | `true` | 启用 GUI 仪表盘 | | `CAM_GUI_DETAIL` | `false` | GUI 详细模式 | | `CAM_KEEP_UI` | `false` | 退出时保留 GUI | > **注意**: 环境变量是 `CAM_KEEP_UI`,不是 `CAM_GUI_KEEP` ### 新增环境变量 | 变量 | 默认值 | 说明 | |------|--------|------| | `CAM_GUI_PORT` | `0` (随机) | 指定 GUI 服务端口 | | `CAM_GUI_HOST` | `127.0.0.1` | 绑定地址 | > **安全警告**: 设置 `CAM_GUI_HOST=0.0.0.0` 会允许远程访问,可能泄露敏感信息(prompt、文件路径等)。仅在受信任网络环境中使用,或通过 `ssh -L` 做端口转发。 ## 重要说明 ### SSE 断开检测 vs 心跳机制 这是两个**不同的机制**,解决不同的问题: | 机制 | 作用 | 位置 | |------|------|------| | **SSE 断开检测** | 检测 GUI 客户端(浏览器/pywebview)是否存在 | GUIServer | | **心跳机制** | 检测主进程是否还活着 | GUIManager | SSE 断开检测**不会替代**心跳机制。两者并存: - 心跳:主进程 → GUI 子进程(检测主进程是否被 SIGKILL) - SSE:GUI 子进程 → 浏览器(检测是否还有客户端连接) ### pywebview 降级逻辑 降级触发条件(任一满足即降级): 1. `import webview` 失败(ImportError) 2. Linux 环境下无 `DISPLAY` 或 `WAYLAND_DISPLAY` 3. `webview.create_window()` 或 `webview.start()` 抛出任何异常 降级后行为: - 打印 URL 到日志 - 启动纯 Web 模式 - 用户需手动在浏览器打开 URL ## 兼容性 ### 向后兼容 - 现有环境变量保持不变 - pywebview 可用时行为与之前一致(只是从 `html=` 改为 `url=`) - 事件格式和渲染逻辑不变 - KEEP_UI 语义不变 ### 新增能力 - pywebview 不可用时自动降级为纯 Web 模式 - 支持多客户端并行访问(最多 10 个) - 支持远程访问(配置 `CAM_GUI_HOST=0.0.0.0`) - 提供 `LiveViewer.url` 属性获取 GUI 地址 ## 测试计划 ### 单元测试 1. `GUIServer` 启动和停止 2. SSE 连接和断开检测 3. 多客户端广播 4. 宽限期逻辑 5. 客户端数量限制 6. **pywebview 不可用时的降级路径** ### 集成测试 1. pywebview 可用时的正常流程 2. pywebview 不可用时的降级流程 3. GUI + 浏览器并行访问 4. KEEP_UI 模式下的退出逻辑 5. **Linux 无 DISPLAY 时的行为** ### 手动测试 1. Windows 原生环境 2. WSL2 环境(无 pywebview) 3. macOS 环境 4. Linux 环境(GTK/QT) ## 风险评估 | 风险 | 影响 | 缓解措施 | |------|------|----------| | SSE 连接不稳定 | 事件丢失 | 心跳保活 + 自动重连(指数退避) | | 端口冲突 | 启动失败 | 使用随机端口(从 TCPServer 获取) | | 多客户端性能 | 广播延迟 | 限制最大客户端数(10) | | 浏览器兼容性 | SSE 不支持 | 现代浏览器都支持 | | 远程访问安全 | 信息泄露 | 默认只绑定 127.0.0.1 | ## 实施步骤 1. **Phase 1**: 新增 `GUIServer` 类(不影响现有功能) 2. **Phase 2**: 修改 `template.py` 添加 `updateStatus` 和 SSE 客户端代码 3. **Phase 3**: 修改 `LiveViewer` 使用 HTTP URL + 降级逻辑 4. **Phase 4**: 更新 `close()` 方法,添加清理逻辑 5. **Phase 5**: 测试和文档更新 ## Codex Review 结果 ### 确认的优点 - ✅ SSE 单向推送完全符合场景需求 - ✅ 多客户端广播设计合理(每客户端独立 Queue) - ✅ 宽限期逻辑无竞态问题 - ✅ 事件格式和渲染逻辑保持兼容 - ✅ KEEP_UI 语义不受影响 ### 已修复的问题 - ✅ 环境变量名:`CAM_GUI_KEEP` → `CAM_KEEP_UI` - ✅ `_poll_queue_loop` 条件不再依赖 `self._window` - ✅ pywebview 降级覆盖所有异常(不只是 ImportError) - ✅ 端口分配改为从 TCPServer 直接获取(避免竞态) - ✅ 添加 `updateStatus` 函数实现 - ✅ `close()` 方法补充 `_server.stop()` - ✅ 添加 host 参数支持 - ✅ 添加客户端数量限制 - ✅ 澄清 SSE 断开检测 vs 心跳机制的区别 ## 参考 - [Server-Sent Events (SSE) - MDN](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) - [pywebview 文档](https://pywebview.flowrl.com/) - [Python http.server](https://docs.python.org/3/library/http.server.html)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shiharuharu/cli-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server