Skip to main content
Glama
websocket.ts19.3 kB
import { io, Socket } from "socket.io-client"; import type { SystemMetrics, LogEntry, MCPServer } from "@/types"; // 临时移除调试器导入,避免模块问题 // import { wsDebugger } from "@/utils/websocket-debug"; export interface WebSocketEvents { // 系统指标更新 "metrics:system": (metrics: SystemMetrics) => void; "metrics:server": (data: { serverId: string; metrics: SystemMetrics; }) => void; // 服务器状态更新 "server:status": (data: { serverId: string; status: MCPServer["status"]; error?: string; }) => void; "server:created": (server: MCPServer) => void; "server:updated": (server: MCPServer) => void; "server:deleted": (serverId: string) => void; // 进程信息更新 "process:info": (data: { serverId: string; processInfo: { process: { pid: number; name: string; status: string; startTime: Date; uptime: number; }; resources: { cpu: number; memory: number; handles?: number; threads?: number; }; system: { platform: string; arch: string; nodeVersion: string; }; details: any; }; }) => void; "process:logs": (data: { serverId: string; logs: Array<{ serverId: string; pid: number; timestamp: Date; level: string; source: string; message: string; metadata?: Record<string, any>; }>; }) => void; // 日志更新 "logs:new": (entry: LogEntry) => void; "logs:batch": (entries: LogEntry[]) => void; // 连接状态事件 connect: () => void; disconnect: () => void; reconnect: () => void; connect_error: (error: Error) => void; } export class WebSocketService { private socket: Socket | null = null; private reconnectAttempts = 0; private maxReconnectAttempts = 5; private reconnectDelay = 1000; private isConnecting = false; private eventHandlers = new Map<keyof WebSocketEvents, Function[]>(); private heartbeatInterval: NodeJS.Timeout | null = null; private connectionCheckInterval: NodeJS.Timeout | null = null; // 调试开关(与后端 WS_DEBUG 对齐) private readonly DEBUG = (import.meta as any).env?.VITE_WS_DEBUG === "true"; private d(...args: any[]) { if (this.DEBUG) console.log("[WebSocketService]", ...args); } constructor(private url: string = "/monitoring") { this.setupEventHandlers(); } // 连接WebSocket connect(): Promise<void> { return new Promise((resolve, reject) => { this.d("Attempting to connect to:", this.url); if (this.socket?.connected) { console.log("[WebSocketService] Already connected"); resolve(); return; } if (this.isConnecting) { console.log("[WebSocketService] Connection already in progress"); reject(new Error("Connection already in progress")); return; } this.isConnecting = true; try { console.log("[WebSocketService] Creating socket.io connection..."); this.socket = io(this.url, { transports: ["websocket", "polling"], timeout: 20000, reconnection: true, reconnectionAttempts: this.maxReconnectAttempts, reconnectionDelay: this.reconnectDelay, autoConnect: false, forceNew: true, // 添加更多配置以提高连接稳定性 upgrade: true, rememberUpgrade: true, // 添加详细的连接信息 query: { clientType: "ui", timestamp: Date.now().toString(), }, }); this.setupSocketEventHandlers(); console.log("[WebSocketService] Initiating connection..."); this.socket.connect(); this.socket.on("connect", () => { console.log("[WebSocketService] WebSocket connected successfully!"); console.log("[WebSocketService] Socket ID:", this.socket?.id); console.log( "[WebSocketService] Transport:", this.socket?.io?.engine?.transport?.name, ); console.log("[WebSocketService] Connection details:", { connected: this.socket?.connected, disconnected: this.socket?.disconnected, transport: this.socket?.io?.engine?.transport?.name, }); this.isConnecting = false; this.reconnectAttempts = 0; this.startHeartbeat(); this.startConnectionCheck(); this.emitEvent("connect"); resolve(); }); this.socket.on("connect_error", (error) => { console.error( "[WebSocketService] WebSocket connection error:", error, ); console.error("[WebSocketService] Error details:", { message: error.message, description: (error as any).description, context: (error as any).context, type: (error as any).type, data: (error as any).data, }); this.isConnecting = false; this.emitEvent("connect_error", error); reject(error); }); // 添加更多调试事件 this.socket.on("disconnect", (reason) => { console.log("[WebSocketService] Disconnected:", reason); }); this.socket.on("reconnect_attempt", (attemptNumber) => { console.log("[WebSocketService] Reconnect attempt:", attemptNumber); }); this.socket.on("reconnect_error", (error) => { console.error("[WebSocketService] Reconnect error:", error); }); this.socket.on("reconnect_failed", () => { console.error("[WebSocketService] Reconnect failed"); }); // 添加连接状态事件监听 this.socket.on( "connection-stats", (data: { totalClients: number; activeRooms: string[]; timestamp: string; }) => { if (process.env.NODE_ENV === "development") { console.log(`[WebSocketService] Connection stats:`, data); } }, ); } catch (error) { console.error("[WebSocketService] Error creating socket:", error); this.isConnecting = false; reject(error); } }); } // 断开连接 disconnect(): void { console.log("[WebSocketService] 🔌 Manually disconnecting WebSocket..."); this.stopHeartbeat(); this.stopConnectionCheck(); if (this.socket) { this.socket.disconnect(); this.socket = null; } this.isConnecting = false; this.reconnectAttempts = 0; } // 开始心跳检查 private startHeartbeat(): void { this.stopHeartbeat(); // 确保没有重复的心跳 this.heartbeatInterval = setInterval(() => { if (this.socket?.connected) { console.log("[WebSocketService] 💓 Sending heartbeat ping"); this.socket.emit("ping", { timestamp: Date.now() }); } }, 30000); // 每30秒发送一次心跳 } // 停止心跳检查 private stopHeartbeat(): void { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } } // 开始连接状态检查 private startConnectionCheck(): void { this.stopConnectionCheck(); // 确保没有重复的检查 this.connectionCheckInterval = setInterval(() => { if (!this.socket?.connected && !this.isConnecting) { console.log( "[WebSocketService] 🔍 Connection lost detected, attempting reconnect...", ); this.attemptReconnect(); } }, 10000); // 每10秒检查一次连接状态 } // 停止连接状态检查 private stopConnectionCheck(): void { if (this.connectionCheckInterval) { clearInterval(this.connectionCheckInterval); this.connectionCheckInterval = null; } } // 检查连接状态 isConnected(): boolean { return this.socket?.connected || false; } // 设置Socket事件处理器 private setupSocketEventHandlers(): void { if (!this.socket) return; this.d("Setting up socket event handlers"); // 连接状态事件 this.socket.on("disconnect", (reason) => { console.log("[WebSocketService] WebSocket disconnected:", reason); console.log("[WebSocketService] Disconnect details:", { reason, connected: this.socket?.connected, disconnected: this.socket?.disconnected, transport: this.socket?.io?.engine?.transport?.name, }); this.emitEvent("disconnect"); // 根据断开原因决定是否重连 if (reason === "io server disconnect") { console.log( "[WebSocketService] Server initiated disconnect, attempting reconnect...", ); this.attemptReconnect(); } else if (reason === "transport close" || reason === "transport error") { console.log( "[WebSocketService] Transport issue, attempting reconnect...", ); this.attemptReconnect(); } else { console.log( "[WebSocketService] Client initiated disconnect, no reconnect needed", ); } }); this.socket.on("reconnect", () => { console.log("WebSocket reconnected"); this.reconnectAttempts = 0; this.emitEvent("reconnect"); }); // 系统指标事件(修复事件名称不匹配问题) this.socket.on( "system-metrics-update", (data: { data: SystemMetrics; timestamp: string }) => { this.emitEvent("metrics:system", data.data); }, ); this.socket.on( "initial-system-metrics", (data: { data: SystemMetrics; timestamp: string }) => { this.emitEvent("metrics:system", data.data); }, ); // 服务器指标事件已在下方处理 // 服务器状态事件(修复事件名称不匹配问题) this.socket.on( "server-status-changed", (data: { serverId: string; status: string; timestamp: string }) => { this.emitEvent("server:status", { serverId: data.serverId, status: data.status as MCPServer["status"], }); }, ); this.socket.on( "server-health-changed", (data: { serverId: string; healthy: boolean; error?: string; timestamp: string; }) => { this.emitEvent("server:status", { serverId: data.serverId, status: data.healthy ? "running" : ("error" as MCPServer["status"]), error: data.error, }); }, ); // 服务器CRUD事件(这些可能需要后端添加支持) this.socket.on("server:created", (server: MCPServer) => { this.emitEvent("server:created", server); }); this.socket.on("server:updated", (server: MCPServer) => { this.emitEvent("server:updated", server); }); this.socket.on("server:deleted", (serverId: string) => { this.emitEvent("server:deleted", serverId); }); // 进程信息事件 this.socket.on( "process:info", (data: { serverId: string; processInfo: any }) => { this.emitEvent("process:info", data); }, ); this.socket.on( "process:logs", (data: { serverId: string; logs: any[] }) => { this.emitEvent("process:logs", data); }, ); // 移除server-metrics-update事件监听器,因为后端现在直接发送process:info事件 // 保留注释以备将来参考 // this.socket.on("server-metrics-update", (data: { // serverId: string; // data: any; // timestamp: string; // }) => { // this.d('server-metrics-update <-', data.serverId, 'ts', data.timestamp); // this.emitEvent("process:info", { // serverId: data.serverId, // processInfo: data.data // }); // }); // 订阅确认事件(修复事件名称不匹配问题) this.socket.on( "subscription-confirmed", (data: { room: string; serverId?: string; interval?: number; timestamp: string; }) => { if (this.DEBUG) console.log(`[WebSocketService] Subscription confirmed:`, data); }, ); // 取消订阅确认事件 this.socket.on( "unsubscription-confirmed", (data: { room: string; timestamp: string }) => { if (process.env.NODE_ENV === "development") { console.log(`[WebSocketService] Unsubscription confirmed:`, data); } }, ); // 日志事件(修复事件名称不匹配问题) this.socket.on( "server-log", (data: { serverId: string; level: string; message: string; source: string; timestamp: string; }) => { const logEntry: LogEntry = { id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, timestamp: new Date(data.timestamp), level: data.level as LogEntry["level"], message: data.message, source: data.source, serverId: data.serverId, }; this.emitEvent("logs:new", logEntry); }, ); // 告警事件 this.socket.on( "alert", (data: { id: string; type: string; severity: string; serverId?: string; message: string; source?: string; timestamp: string; }) => { const logEntry: LogEntry = { id: data.id, timestamp: new Date(data.timestamp), level: data.severity as LogEntry["level"], message: data.message, source: data.source || "alert", serverId: data.serverId, }; this.emitEvent("logs:new", logEntry); }, ); // 保留原有的批量日志事件(如果后端有发送) this.socket.on("logs:batch", (entries: LogEntry[]) => { this.emitEvent("logs:batch", entries); }); } // 尝试重连 private attemptReconnect(): void { if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error("Max reconnection attempts reached"); return; } this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); // 指数退避 console.log( `Attempting to reconnect (${this.reconnectAttempts}/${this.maxReconnectAttempts}) in ${delay}ms`, ); setTimeout(() => { if (!this.isConnected()) { this.connect().catch((error) => { console.error("Reconnection failed:", error); }); } }, delay); } // 设置事件处理器 private setupEventHandlers(): void { // 初始化事件处理器映射 const eventKeys: (keyof WebSocketEvents)[] = [ "metrics:system", "metrics:server", "server:status", "server:created", "server:updated", "server:deleted", "process:info", "process:logs", "logs:new", "logs:batch", "connect", "disconnect", "reconnect", "connect_error", ]; eventKeys.forEach((event) => { this.eventHandlers.set(event, []); }); } // 注册事件监听器 on<K extends keyof WebSocketEvents>( event: K, handler: WebSocketEvents[K], ): void { const handlers = this.eventHandlers.get(event) || []; handlers.push(handler); this.eventHandlers.set(event, handlers); } // 移除事件监听器 off<K extends keyof WebSocketEvents>( event: K, handler?: WebSocketEvents[K], ): void { const handlers = this.eventHandlers.get(event) || []; if (handler) { const index = handlers.indexOf(handler); if (index > -1) { handlers.splice(index, 1); } } else { // 如果没有指定处理器,清除所有处理器 this.eventHandlers.set(event, []); } } // 触发事件 private emitEvent<K extends keyof WebSocketEvents>( event: K, ...args: Parameters<WebSocketEvents[K]> ): void { const handlers = this.eventHandlers.get(event) || []; handlers.forEach((handler) => { try { (handler as Function)(...args); } catch (error) { console.error(`Error in WebSocket event handler for ${event}:`, error); } }); } // 发送消息到服务器(增强版本) emit(event: string, data?: any): void { if (this.socket?.connected) { this.d("emit", event, data); this.socket.emit(event, data); if (event.includes("subscribe") && this.DEBUG) { setTimeout(() => { this.d("post-subscription status check for", event); this.socket?.emit("get-connection-status"); }, 3000); } } else if (this.DEBUG) { console.warn("[WebSocketService] emit while disconnected", event, data); } } // 订阅特定服务器的更新 subscribeToServer(serverId: string): void { this.emit("subscribe:server", { serverId }); } // 取消订阅特定服务器的更新 unsubscribeFromServer(serverId: string): void { this.emit("unsubscribe:server", { serverId }); } // 订阅系统指标更新 subscribeToMetrics(): void { this.emit("subscribe:metrics"); } // 取消订阅系统指标更新 unsubscribeFromMetrics(): void { this.emit("unsubscribe:metrics"); } // 订阅日志更新 subscribeToLogs(filter?: { level?: string[]; serverId?: string }): void { this.emit("subscribe:logs", filter); } // 取消订阅日志更新 unsubscribeFromLogs(): void { this.emit("unsubscribe:logs"); } // 订阅进程信息更新(强化版本) subscribeToProcessInfo(serverId: string): void { if (!this.socket?.connected) { if (this.DEBUG) console.warn( "[WebSocketService] Socket not connected, defer subscribeToProcessInfo", serverId, ); this.connect() .then(() => setTimeout(() => this.subscribeToProcessInfo(serverId), 500), ) .catch(() => {}); return; } this.emit("subscribe-server-metrics", { serverId, interval: 5000 }); } // 取消订阅进程信息更新 unsubscribeFromProcessInfo(serverId: string): void { // 兼容旧通用unsubscribe & 新事件 this.emit("unsubscribe", { room: `server-metrics-${serverId}` }); this.emit("unsubscribe-server-metrics", { serverId }); } // 订阅进程日志更新 subscribeToProcessLogs(serverId: string, level?: string): void { this.emit("subscribe-server-logs", { serverId, level }); } // 取消订阅进程日志更新 unsubscribeFromProcessLogs(serverId: string): void { this.emit("unsubscribe", { room: `server-logs-${serverId}` }); this.emit("unsubscribe-server-logs", { serverId }); } // 获取连接状态信息 getConnectionInfo(): { connected: boolean; reconnectAttempts: number; maxReconnectAttempts: number; isConnecting: boolean; } { return { connected: this.isConnected(), reconnectAttempts: this.reconnectAttempts, maxReconnectAttempts: this.maxReconnectAttempts, isConnecting: this.isConnecting, }; } } // 创建全局WebSocket服务实例 export const websocketService = new WebSocketService("/monitoring"); // 导出类型已在文件顶部定义

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zaizaizhao/mcp-swagger-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server