Skip to main content
Glama
window.py10.4 kB
"""pywebview 窗口管理。 cli-agent-mcp shared/gui v0.1.0 同步日期: 2025-12-16 提供 pywebview 窗口和 Queue 通信机制。 Example: # 基本用法(单端模式) viewer = LiveViewer(title="GeminiMCP") viewer.start() viewer.push_event(unified_event_dict) # 多端模式 viewer = LiveViewer(title="CLI Agent", multi_source_mode=True) viewer.start() """ from __future__ import annotations import json import logging import queue import threading import time from dataclasses import dataclass from typing import Any, Callable from .renderer import EventRenderer, RenderConfig from .template import generate_html logger = logging.getLogger(__name__) __all__ = [ "LiveViewer", "ViewerConfig", ] @dataclass class ViewerConfig: """查看器配置。 Attributes: title: 窗口标题 width: 窗口宽度 height: 窗口高度 multi_source_mode: 是否为多端模式 queue_max_size: 事件队列最大大小 poll_interval_ms: 队列轮询间隔(毫秒) """ title: str = "CLI Agent Live Output" width: int = 1000 height: int = 650 multi_source_mode: bool = False queue_max_size: int = 5000 poll_interval_ms: int = 50 class LiveViewer: """实时事件查看器。 使用 pywebview 显示统一事件流,支持: - 单端模式(不显示来源标签) - 多端模式(显示来源标签,侧边栏按来源分组) - Queue 通信(线程安全的事件推送) - 状态栏统计(尽力而为) Example: viewer = LiveViewer(multi_source_mode=True) viewer.start() # 从其他线程推送事件 viewer.push_event(event_dict) # 或使用便捷方法 viewer.push_raw(raw_cli_output, source="gemini") """ def __init__( self, config: ViewerConfig | None = None, *, title: str | None = None, multi_source_mode: bool | None = None, ) -> None: """初始化查看器。 Args: config: 完整配置对象 title: 窗口标题(覆盖 config) multi_source_mode: 是否多端模式(覆盖 config) """ self.config = config or ViewerConfig() if title is not None: self.config.title = title if multi_source_mode is not None: self.config.multi_source_mode = multi_source_mode # 渲染器 self._renderer = EventRenderer( RenderConfig(multi_source_mode=self.config.multi_source_mode) ) # 事件队列 self._queue: queue.Queue[dict[str, Any] | None] = queue.Queue( maxsize=self.config.queue_max_size ) # 窗口引用 self._window = None self._started = threading.Event() self._closed = threading.Event() self._poll_thread: threading.Thread | None = None # 统计 self._stats = { "model": None, "session": None, "tokens": 0, "duration": 0.0, "tools": 0, } def start(self, blocking: bool = True) -> None: """启动查看器。 Args: blocking: 是否阻塞当前线程 """ if blocking: self._run() else: thread = threading.Thread(target=self._run, daemon=True) thread.start() # 等待窗口准备好 self._started.wait(timeout=10) def _run(self) -> None: """运行 pywebview 主循环。""" try: import webview except ImportError as e: raise ImportError( "pywebview is required for GUI. Install with: pip install pywebview" ) from e html = generate_html( multi_source_mode=self.config.multi_source_mode, title=self.config.title, ) self._window = webview.create_window( self.config.title, html=html, width=self.config.width, height=self.config.height, min_size=(600, 400), ) def on_loaded(): self._started.set() # 启动单一后台轮询线程 self._poll_thread = threading.Thread( target=self._poll_queue_loop, daemon=True ) self._poll_thread.start() def on_closed(): self._closed.set() self._queue.put(None) # Signal to stop polling self._window.events.loaded += on_loaded self._window.events.closed += on_closed webview.start() def _poll_queue_loop(self) -> None: """后台轮询线程主循环。使用单一线程避免 Timer 线程爆炸。""" poll_interval = self.config.poll_interval_ms / 1000 # 转换为秒 while not self._closed.is_set() and self._window is not None: try: # 批量处理队列中的所有事件 events_processed = 0 while events_processed < 100: # 每次最多处理 100 个事件 try: event = self._queue.get_nowait() if event is None: return # 收到停止信号 self._render_event(event) events_processed += 1 except queue.Empty: break except Exception as e: logger.debug(f"Poll loop error: {e}") # 等待下一次轮询 time.sleep(poll_interval) def _render_event(self, event: dict[str, Any]) -> None: """渲染单个事件到窗口。""" if self._window is None: return try: html = self._renderer.render(event) session_id = self._extract_session_id(event) source = event.get("source", "unknown") task_note = event.get("metadata", {}).get("task_note", "") or event.get("task_note", "") # Escape for JS html_escaped = json.dumps(html) session_escaped = json.dumps(session_id) source_escaped = json.dumps(source) task_note_escaped = json.dumps(task_note) self._window.evaluate_js( f"addEvent({html_escaped}, {session_escaped}, {source_escaped}, {task_note_escaped})" ) # Update status self._update_stats(event) except Exception as e: # Log error but don't crash logger.warning(f"Render error: {e}") def _extract_session_id(self, event: dict[str, Any]) -> str: """提取 session ID。""" if event.get("session_id"): return event["session_id"] metadata = event.get("metadata", {}) return metadata.get("session_id", "") or metadata.get("thread_id", "") def _update_stats(self, event: dict[str, Any]) -> None: """更新状态栏统计。""" if self._window is None: return updated = False # Model if event.get("model"): self._stats["model"] = event["model"] updated = True # Session session_id = self._extract_session_id(event) if session_id: self._stats["session"] = session_id updated = True # Stats from lifecycle events stats = event.get("stats", {}) if stats: if stats.get("total_tokens"): self._stats["tokens"] = stats["total_tokens"] updated = True elif stats.get("input_tokens") or stats.get("output_tokens"): self._stats["tokens"] = ( stats.get("input_tokens", 0) + stats.get("output_tokens", 0) ) updated = True if stats.get("duration_ms"): self._stats["duration"] = stats["duration_ms"] / 1000 updated = True if stats.get("tool_calls"): self._stats["tools"] = stats["tool_calls"] updated = True # Tool count from operations if event.get("category") == "operation" and event.get("status") == "running": self._stats["tools"] = self._stats.get("tools", 0) + 1 updated = True # Streaming indicator is_streaming = event.get("is_delta", False) or event.get("status") == "running" if updated: status_data = { "model": self._stats.get("model"), "session": self._stats.get("session"), "tokens": self._stats.get("tokens", 0), "duration": self._stats.get("duration", 0.0), "tools": self._stats.get("tools", 0), "streaming": is_streaming, } try: self._window.evaluate_js(f"updateStatus({json.dumps(status_data)})") except Exception: pass def push_event(self, event: dict[str, Any]) -> bool: """推送统一事件到显示队列。 Args: event: 统一事件字典(UnifiedEvent.model_dump()) Returns: 是否成功入队(队列满时返回 False) """ try: self._queue.put_nowait(event) return True except queue.Full: return False def push_events(self, events: list[dict[str, Any]]) -> int: """批量推送事件。 Args: events: 事件列表 Returns: 成功入队的数量 """ count = 0 for event in events: if self.push_event(event): count += 1 return count def close(self) -> None: """关闭查看器窗口。""" if self._window: try: self._window.destroy() except Exception: pass @property def is_running(self) -> bool: """查看器是否正在运行。""" return self._started.is_set() and not self._closed.is_set() def wait_closed(self, timeout: float | None = None) -> bool: """等待窗口关闭。 Args: timeout: 超时时间(秒) Returns: 是否已关闭 """ return self._closed.wait(timeout=timeout)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shiharuharu/cli-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server