Skip to main content
Glama

Office MCP Server

by walkingzzzy
StreamingService.ts12 kB
import { Response } from 'express'; import { WebSocket } from 'ws'; import OpenAIService from './OpenAIService'; import ClaudeService from './ClaudeService'; import ToolExecutionEngine from './ToolExecutionEngine'; import ConversationManager from './ConversationManager'; import logger from '../utils/logger'; export interface StreamingOptions { conversationId: string; model: 'openai' | 'claude'; enableTools?: boolean; onProgress?: (progress: number) => void; onToolCall?: (toolName: string, args: any) => void; } export class StreamingService { private openaiService: OpenAIService; private claudeService: ClaudeService; private toolEngine: ToolExecutionEngine; private conversationManager: ConversationManager; constructor( openaiService: OpenAIService, claudeService: ClaudeService, toolEngine: ToolExecutionEngine, conversationManager: ConversationManager ) { this.openaiService = openaiService; this.claudeService = claudeService; this.toolEngine = toolEngine; this.conversationManager = conversationManager; logger.info('流式响应服务初始化完成'); } /** * SSE流式响应 */ async streamToSSE( res: Response, userMessage: string, options: StreamingOptions ): Promise<void> { // 设置SSE头 res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control' }); try { // 添加用户消息到对话 this.conversationManager.addMessage(options.conversationId, 'user', userMessage); // 发送开始事件 this.sendSSEEvent(res, 'start', { message: '开始处理请求' }); if (options.model === 'openai') { await this.streamOpenAIToSSE(res, options); } else { await this.streamClaudeToSSE(res, options); } // 发送完成事件 this.sendSSEEvent(res, 'done', { message: '处理完成' }); } catch (error: any) { logger.error('SSE流式响应失败', error); this.sendSSEEvent(res, 'error', { error: error.message }); } finally { res.end(); } } /** * WebSocket流式响应 */ async streamToWebSocket( ws: WebSocket, userMessage: string, options: StreamingOptions ): Promise<void> { try { // 添加用户消息到对话 this.conversationManager.addMessage(options.conversationId, 'user', userMessage); // 发送开始消息 this.sendWebSocketMessage(ws, 'status', { status: 'processing', message: '开始处理请求' }); if (options.model === 'openai') { await this.streamOpenAIToWebSocket(ws, options); } else { await this.streamClaudeToWebSocket(ws, options); } // 发送完成消息 this.sendWebSocketMessage(ws, 'status', { status: 'completed', message: '处理完成' }); } catch (error: any) { logger.error('WebSocket流式响应失败', error); this.sendWebSocketMessage(ws, 'error', { error: error.message }); } } /** * OpenAI流式响应到SSE */ private async streamOpenAIToSSE(res: Response, options: StreamingOptions): Promise<void> { const messages = this.conversationManager.getOpenAIMessages(options.conversationId); const functions = options.enableTools ? this.toolEngine.getAvailableTools() : undefined; let assistantMessage = ''; let messageId = ''; try { const stream = this.openaiService.streamChat(messages, functions as any); for await (const chunk of stream) { if (!messageId) { messageId = this.generateMessageId(); this.sendSSEEvent(res, 'message_start', { messageId }); } assistantMessage += chunk; this.sendSSEEvent(res, 'content_delta', { messageId, delta: chunk, content: assistantMessage }); } // 保存助手消息 if (assistantMessage) { this.conversationManager.addMessage( options.conversationId, 'assistant', assistantMessage ); } } catch (error: any) { throw new Error(`OpenAI流式响应失败: ${error.message}`); } } /** * Claude流式响应到SSE */ private async streamClaudeToSSE(res: Response, options: StreamingOptions): Promise<void> { const messages = this.conversationManager.getClaudeMessages(options.conversationId); const tools = options.enableTools ? this.toolEngine.getAvailableTools() : undefined; let assistantMessage = ''; let messageId = ''; try { const stream = this.claudeService.streamMessage(messages, tools as any); for await (const chunk of stream) { if (!messageId) { messageId = this.generateMessageId(); this.sendSSEEvent(res, 'message_start', { messageId }); } assistantMessage += chunk; this.sendSSEEvent(res, 'content_delta', { messageId, delta: chunk, content: assistantMessage }); } // 保存助手消息 if (assistantMessage) { this.conversationManager.addMessage( options.conversationId, 'assistant', assistantMessage ); } } catch (error: any) { throw new Error(`Claude流式响应失败: ${error.message}`); } } /** * OpenAI流式响应到WebSocket */ private async streamOpenAIToWebSocket(ws: WebSocket, options: StreamingOptions): Promise<void> { const messages = this.conversationManager.getOpenAIMessages(options.conversationId); const functions = options.enableTools ? this.toolEngine.getAvailableTools() : undefined; let assistantMessage = ''; let messageId = this.generateMessageId(); try { this.sendWebSocketMessage(ws, 'message', { id: messageId, content: '', isComplete: false }); const stream = this.openaiService.streamChat(messages, functions as any); for await (const chunk of stream) { assistantMessage += chunk; this.sendWebSocketMessage(ws, 'message', { id: messageId, content: chunk, isComplete: false }); } // 发送完成标记 this.sendWebSocketMessage(ws, 'message', { id: messageId, content: '', isComplete: true }); // 保存助手消息 if (assistantMessage) { this.conversationManager.addMessage( options.conversationId, 'assistant', assistantMessage ); } } catch (error: any) { throw new Error(`OpenAI WebSocket流式响应失败: ${error.message}`); } } /** * Claude流式响应到WebSocket */ private async streamClaudeToWebSocket(ws: WebSocket, options: StreamingOptions): Promise<void> { const messages = this.conversationManager.getClaudeMessages(options.conversationId); const tools = options.enableTools ? this.toolEngine.getAvailableTools() : undefined; let assistantMessage = ''; let messageId = this.generateMessageId(); try { this.sendWebSocketMessage(ws, 'message', { id: messageId, content: '', isComplete: false }); const stream = this.claudeService.streamMessage(messages, tools as any); for await (const chunk of stream) { assistantMessage += chunk; this.sendWebSocketMessage(ws, 'message', { id: messageId, content: chunk, isComplete: false }); } // 发送完成标记 this.sendWebSocketMessage(ws, 'message', { id: messageId, content: '', isComplete: true }); // 保存助手消息 if (assistantMessage) { this.conversationManager.addMessage( options.conversationId, 'assistant', assistantMessage ); } } catch (error: any) { throw new Error(`Claude WebSocket流式响应失败: ${error.message}`); } } /** * 发送SSE事件 */ private sendSSEEvent(res: Response, event: string, data: any): void { res.write(`event: ${event}\n`); res.write(`data: ${JSON.stringify(data)}\n\n`); } /** * 发送WebSocket消息 */ private sendWebSocketMessage(ws: WebSocket, type: string, data: any): void { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type, data, timestamp: Date.now() })); } } /** * 生成消息ID */ private generateMessageId(): string { return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } /** * 处理工具调用流式响应 */ async streamWithTools( ws: WebSocket, userMessage: string, options: StreamingOptions ): Promise<void> { try { // 添加用户消息 this.conversationManager.addMessage(options.conversationId, 'user', userMessage); const messages = options.model === 'openai' ? this.conversationManager.getOpenAIMessages(options.conversationId) : this.conversationManager.getClaudeMessages(options.conversationId); // 发送进度 this.sendWebSocketMessage(ws, 'progress', { progress: 10, status: '分析请求' }); if (options.model === 'openai') { const functions = this.toolEngine.getAvailableTools(); const response = await this.openaiService.callFunction(messages as any, functions as any); if (response.functionCall) { // 发送工具调用进度 this.sendWebSocketMessage(ws, 'progress', { progress: 50, status: '执行工具' }); options.onToolCall?.(response.functionCall.name, response.functionCall.arguments); // 执行工具 const toolResult = await this.toolEngine.executeTool( response.functionCall.name, response.functionCall.arguments, { conversationId: options.conversationId } ); // 发送工具结果 this.sendWebSocketMessage(ws, 'tool_result', { toolName: response.functionCall.name, result: toolResult }); // 继续对话 this.conversationManager.addMessage( options.conversationId, 'function', JSON.stringify(toolResult.result), { functionCall: response.functionCall } ); // 获取最终响应 const finalMessages = this.conversationManager.getOpenAIMessages(options.conversationId); await this.streamOpenAIToWebSocket(ws, { ...options, enableTools: false }); } else { // 直接流式响应 await this.streamOpenAIToWebSocket(ws, options); } } else { // Claude工具使用 const tools = this.toolEngine.getAvailableTools(); const response = await this.claudeService.useTools(messages as any, tools as any); if (response.toolUse && response.toolUse.length > 0) { for (const tool of response.toolUse) { this.sendWebSocketMessage(ws, 'progress', { progress: 50, status: `执行工具: ${tool.name}` }); options.onToolCall?.(tool.name, tool.input); const toolResult = await this.toolEngine.executeTool( tool.name, tool.input, { conversationId: options.conversationId } ); this.sendWebSocketMessage(ws, 'tool_result', { toolName: tool.name, result: toolResult }); } } await this.streamClaudeToWebSocket(ws, options); } this.sendWebSocketMessage(ws, 'progress', { progress: 100, status: '完成' }); } catch (error: any) { logger.error('工具调用流式响应失败', error); this.sendWebSocketMessage(ws, 'error', { error: error.message }); } } } export default StreamingService;

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/walkingzzzy/office-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server