Skip to main content
Glama
websocket.ts16.2 kB
import { defineStore } from "pinia"; import { ref, computed } from "vue"; import { websocketService, type WebSocketEvents } from "@/services/websocket"; import { useAppStore } from "./app"; import { useServerStore } from "./server"; import { useMonitoringStore } from "./monitoring"; export const useWebSocketStore = defineStore("websocket", () => { const appStore = useAppStore(); // 状态 const connected = ref(false); const connecting = ref(false); const reconnectAttempts = ref(0); const lastError = ref<string | null>(null); const subscriptions = ref<Set<string>>(new Set()); // 计算属性 const connectionStatus = computed(() => { if (connecting.value) return "connecting"; if (connected.value) return "connected"; return "disconnected"; }); const connectionInfo = computed(() => ({ status: connectionStatus.value, connected: connected.value, reconnectAttempts: reconnectAttempts.value, lastError: lastError.value, subscriptions: Array.from(subscriptions.value), })); // Actions const setConnected = (value: boolean) => { connected.value = value; }; const setConnecting = (value: boolean) => { connecting.value = value; }; const setReconnectAttempts = (value: number) => { reconnectAttempts.value = value; }; const setLastError = (error: string | null) => { lastError.value = error; }; // 连接WebSocket const connect = async (): Promise<boolean> => { if (connected.value || connecting.value) { return connected.value; } setConnecting(true); setLastError(null); try { await websocketService.connect(); setConnected(true); setConnecting(false); appStore.addNotification({ type: "success", title: "WebSocket连接成功", message: "实时数据更新已启用", duration: 3000, }); // 设置默认订阅 await setupDefaultSubscriptions(); return true; } catch (error) { setConnecting(false); const errorMessage = error instanceof Error ? error.message : "WebSocket连接失败"; setLastError(errorMessage); appStore.addNotification({ type: "error", title: "WebSocket连接失败", message: errorMessage, duration: 5000, }); return false; } }; // 重新连接后恢复订阅 const restoreSubscriptions = () => { console.log("[WebSocketStore] Restoring subscriptions after reconnect"); const currentSubscriptions = Array.from(subscriptions.value); currentSubscriptions.forEach((subscription) => { if (subscription.startsWith("process:info:")) { const serverId = subscription.replace("process:info:", ""); console.log( `[WebSocketStore] Restoring process info subscription for server: ${serverId}`, ); websocketService.subscribeToProcessInfo(serverId); } else if (subscription.startsWith("process:logs:")) { const serverId = subscription.replace("process:logs:", ""); console.log( `[WebSocketStore] Restoring process logs subscription for server: ${serverId}`, ); websocketService.subscribeToProcessLogs(serverId); } }); }; // 断开连接 const disconnect = () => { websocketService.disconnect(); setConnected(false); setConnecting(false); // 清理订阅状态 subscriptions.value.clear(); console.log("[WebSocketStore] Cleared all subscriptions on disconnect"); appStore.addNotification({ type: "info", title: "WebSocket已断开", message: "实时数据更新已停用", duration: 3000, }); }; // 设置默认订阅 const setupDefaultSubscriptions = async () => { // 订阅系统指标 subscribeToMetrics(); // 订阅日志 subscribeToLogs(); // 如果有选中的服务器,订阅其更新 const serverStore = useServerStore(); if (serverStore.selectedServerId) { subscribeToServer(serverStore.selectedServerId); } }; // 订阅系统指标 const subscribeToMetrics = () => { if (!connected.value) return; websocketService.subscribeToMetrics(); subscriptions.value.add("metrics"); }; // 取消订阅系统指标 const unsubscribeFromMetrics = () => { if (!connected.value) return; websocketService.unsubscribeFromMetrics(); subscriptions.value.delete("metrics"); }; // 订阅服务器更新 const subscribeToServer = (serverId: string) => { if (!connected.value) return; websocketService.subscribeToServer(serverId); subscriptions.value.add(`server:${serverId}`); }; // 取消订阅服务器更新 const unsubscribeFromServer = (serverId: string) => { if (!connected.value) return; websocketService.unsubscribeFromServer(serverId); subscriptions.value.delete(`server:${serverId}`); }; // 订阅日志 const subscribeToLogs = (filter?: { level?: string[]; serverId?: string; }) => { if (!connected.value) return; websocketService.subscribeToLogs(filter); subscriptions.value.add("logs"); }; // 取消订阅日志 const unsubscribeFromLogs = () => { if (!connected.value) return; websocketService.unsubscribeFromLogs(); subscriptions.value.delete("logs"); }; // 订阅进程信息 const subscribeToProcessInfo = (serverId: string) => { if (process.env.NODE_ENV === "development") { console.log( `[WebSocketStore] subscribeToProcessInfo called for server: ${serverId}`, ); } if (!connected.value) { console.warn( "[WebSocketStore] Not connected, cannot subscribe to process info", ); return; } const subscriptionKey = `process:info:${serverId}`; // 暂时移除重复订阅检查,确保订阅请求能够发送 websocketService.subscribeToProcessInfo(serverId); subscriptions.value.add(subscriptionKey); }; // 取消订阅进程信息 const unsubscribeFromProcessInfo = (serverId: string) => { if (!connected.value) return; websocketService.unsubscribeFromProcessInfo(serverId); subscriptions.value.delete(`process:info:${serverId}`); }; // 订阅进程日志 const subscribeToProcessLogs = (serverId: string) => { if (process.env.NODE_ENV === "development") { console.log( `[WebSocketStore] subscribeToProcessLogs called for server: ${serverId}`, ); } if (!connected.value) { console.warn( "[WebSocketStore] Not connected, cannot subscribe to process logs", ); return; } const subscriptionKey = `process:logs:${serverId}`; // 暂时移除重复订阅检查,确保订阅请求能够发送 websocketService.subscribeToProcessLogs(serverId); subscriptions.value.add(subscriptionKey); }; // 取消订阅进程日志 const unsubscribeFromProcessLogs = (serverId: string) => { if (!connected.value) return; websocketService.unsubscribeFromProcessLogs(serverId); subscriptions.value.delete(`process:logs:${serverId}`); }; // 设置事件监听器 const setupEventListeners = () => { const serverStore = useServerStore(); const monitoringStore = useMonitoringStore(); // 连接状态事件 websocketService.on("connect", () => { setConnected(true); setConnecting(false); setReconnectAttempts(0); setLastError(null); if (process.env.NODE_ENV === "development") { console.log( "[WebSocketStore] WebSocket connected, current subscriptions:", Array.from(subscriptions.value), ); } // 不要清空订阅记录,这会导致重复订阅检查失效 // 只在真正断开连接时才清空 }); websocketService.on("disconnect", () => { setConnected(false); setConnecting(false); if (process.env.NODE_ENV === "development") { console.log("[WebSocketStore] WebSocket disconnected"); } }); websocketService.on("reconnect", () => { setConnected(true); setConnecting(false); setReconnectAttempts(0); appStore.addNotification({ type: "success", title: "WebSocket重连成功", message: "实时数据更新已恢复", duration: 3000, }); // 重连后恢复订阅 restoreSubscriptions(); }); websocketService.on("connect_error", (error) => { setConnecting(false); const errorMessage = error.message || "WebSocket连接错误"; setLastError(errorMessage); const info = websocketService.getConnectionInfo(); setReconnectAttempts(info.reconnectAttempts); }); // 系统指标事件 websocketService.on("metrics:system", (metrics) => { monitoringStore.updateSystemMetrics(metrics); }); websocketService.on("metrics:server", (data) => { monitoringStore.updateServerMetrics(data.serverId, data.metrics); serverStore.updateServerMetrics(data.serverId, { totalRequests: data.metrics.totalRequests, averageResponseTime: data.metrics.averageResponseTime, }); }); // 服务器状态事件 websocketService.on("server:status", (data) => { serverStore.updateServerStatus(data.serverId, data.status, data.error); if (data.status === "error" && data.error) { appStore.addNotification({ type: "error", title: "服务器错误", message: `服务器 ${data.serverId} 发生错误: ${data.error}`, duration: 5000, }); } }); websocketService.on("server:created", (server) => { // 刷新服务器列表 serverStore.fetchServers(); appStore.addNotification({ type: "success", title: "服务器已创建", message: `服务器 "${server.name}" 已创建`, duration: 3000, }); }); websocketService.on("server:updated", (server) => { // 更新本地服务器数据 const index = serverStore.servers.findIndex((s) => s.id === server.id); if (index > -1) { serverStore.servers[index] = server; } appStore.addNotification({ type: "info", title: "服务器已更新", message: `服务器 "${server.name}" 配置已更新`, duration: 3000, }); }); websocketService.on("server:deleted", (serverId) => { // 从本地列表中移除 const index = serverStore.servers.findIndex((s) => s.id === serverId); if (index > -1) { const serverName = serverStore.servers[index].name; serverStore.servers.splice(index, 1); appStore.addNotification({ type: "warning", title: "服务器已删除", message: `服务器 "${serverName}" 已被删除`, duration: 3000, }); } // 取消订阅 unsubscribeFromServer(serverId); }); // 日志事件 websocketService.on("logs:new", (entry) => { monitoringStore.addLogEntry(entry); }); websocketService.on("logs:batch", (entries) => { monitoringStore.addLogEntries(entries); }); }; // 重连 const reconnect = async (): Promise<boolean> => { if (connecting.value) return false; disconnect(); await new Promise((resolve) => setTimeout(resolve, 1000)); // 等待1秒 return await connect(); }; // 获取连接统计信息 const getConnectionStats = () => { const info = websocketService.getConnectionInfo(); return { ...info, subscriptions: Array.from(subscriptions.value), }; }; // 初始化 const initialize = async () => { setupEventListeners(); // 如果全局设置启用了自动连接,则自动连接 if (appStore.globalSettings.autoRefresh) { await connect(); } }; // 存储每个订阅的回调函数,用于精确取消订阅 const subscriptionCallbacks = new Map< string, Map<string, (data: any) => void> >(); // 通用订阅方法 const subscribe = ( eventType: string, callback: (data: any) => void, subscriptionId?: string, ) => { // 生成唯一的订阅ID const id = subscriptionId || `${eventType}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; // 存储回调函数 if (!subscriptionCallbacks.has(eventType)) { subscriptionCallbacks.set(eventType, new Map()); } subscriptionCallbacks.get(eventType)!.set(id, callback); switch (eventType) { case "server-status": websocketService.on("server:status", callback); break; case "server-metrics": websocketService.on("metrics:server", callback); break; case "system-metrics": websocketService.on("metrics:system", callback); break; case "logs": websocketService.on("logs:new", callback); break; case "process:info": websocketService.on("process:info", callback); break; case "process:logs": websocketService.on("process:logs", callback); break; default: // 对于其他事件类型,暂时不处理 console.warn(`Unsupported event type: ${eventType}`); return null; } subscriptions.value.add(eventType); return id; // 返回订阅ID,用于后续取消订阅 }; // 通用取消订阅方法 const unsubscribe = (eventType: string, subscriptionId?: string) => { const callbacks = subscriptionCallbacks.get(eventType); if (subscriptionId && callbacks) { // 精确取消特定的订阅 const callback = callbacks.get(subscriptionId); if (callback) { switch (eventType) { case "server-status": websocketService.off("server:status", callback); break; case "server-metrics": websocketService.off("metrics:server", callback); break; case "system-metrics": websocketService.off("metrics:system", callback); break; case "logs": websocketService.off("logs:new", callback); break; case "process:info": websocketService.off("process:info", callback); break; case "process:logs": websocketService.off("process:logs", callback); break; } callbacks.delete(subscriptionId); // 如果该事件类型没有更多回调,从订阅集合中移除 if (callbacks.size === 0) { subscriptions.value.delete(eventType); subscriptionCallbacks.delete(eventType); } } } else { // 取消该事件类型的所有订阅(保持原有行为) switch (eventType) { case "server-status": websocketService.off("server:status"); break; case "server-metrics": websocketService.off("metrics:server"); break; case "system-metrics": websocketService.off("metrics:system"); break; case "logs": websocketService.off("logs:new"); break; case "process:info": websocketService.off("process:info"); break; case "process:logs": websocketService.off("process:logs"); break; default: console.warn(`Unsupported event type: ${eventType}`); } subscriptions.value.delete(eventType); subscriptionCallbacks.delete(eventType); } }; return { // 状态 connected, connecting, reconnectAttempts, lastError, subscriptions, // 计算属性 connectionStatus, connectionInfo, // WebSocket服务实例(用于调试) websocketService, // Actions connect, disconnect, reconnect, subscribe, unsubscribe, subscribeToMetrics, unsubscribeFromMetrics, subscribeToServer, unsubscribeFromServer, subscribeToLogs, unsubscribeFromLogs, subscribeToProcessInfo, unsubscribeFromProcessInfo, subscribeToProcessLogs, unsubscribeFromProcessLogs, getConnectionStats, initialize, restoreSubscriptions, }; });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zaizaizhao/mcp-swagger-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server