Skip to main content
Glama

MCP Server.exe

by shadowcz007
mcpRouterServer.ts14.3 kB
import type { Implementation } from '@modelcontextprotocol/sdk/types.js' import { McpServerComposer } from './serverComposer' import { ExpressSSEServerTransport } from './expressSseTransport' import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js' import type { McpServerType } from './utils/schemas' import express from 'express' import cors from 'cors' import type { Express } from 'express' import { formatLog, LogCategory, LogLevel } from './utils/console' import { McpServer } from '@modelcontextprotocol/sdk/server/mcp' import type { Server } from 'http' import { ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js' import { z } from 'zod' import { fieldsToZodSchema } from './utils/xToZodSchema' import { openCursorLink } from './utils/cursorLink' const NAMESPACE_SEPARATOR = '.' export class McpRouterServer { private app!: Express private httpServer: Server | null = null private readonly transportType: 'sse' | 'stdio' private readonly baseServerInfo: Implementation private parsedConfig: { targetServers: McpServerType[] toolChains: any[] toolsFilter: string[] namespace: string configureMcp: Function | null } | null = null private readonly sseSessions: Map< string, { composer: McpServerComposer server: McpServer transport: ExpressSSEServerTransport } > = new Map() private stdioComposer: McpServerComposer | null = null private stdioServer: McpServer | null = null private stdioTransport: StdioServerTransport | null = null // 添加默认SSE服务器实例,用于WebSocket等场景 private defaultSseComposer: McpServerComposer | null = null private defaultSseServer: McpServer | null = null constructor( serverInfo: Implementation, private readonly serverOptions: { port?: number host?: string transportType?: 'sse' | 'stdio' cursorLink?: boolean } ) { this.baseServerInfo = serverInfo this.transportType = serverOptions.transportType ?? 'sse' // 初始化默认服务器实例 if (this.transportType === 'sse') { this.initSseServer() } else if (this.transportType === 'stdio') { this.initStdioServer() } } initSseServer() { this.defaultSseComposer = new McpServerComposer(this.baseServerInfo) this.defaultSseServer = this.defaultSseComposer.server } initStdioServer() { this.stdioComposer = new McpServerComposer(this.baseServerInfo) this.stdioServer = this.stdioComposer.server this.stdioTransport = new StdioServerTransport() } private async setupRoutes() { if (this.transportType === 'stdio') { formatLog(LogLevel.INFO, 'Initializing server in stdio mode...') this.stdioServer.connect(this.stdioTransport) formatLog(LogLevel.INFO, 'Server initialized and connected in stdio mode') return } // SSE Mode this.app = express() const corsOptions = { origin: '*', methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], allowedHeaders: ['Content-Type', 'Authorization', 'X-Requested-With'], credentials: true, maxAge: 186400 } this.app.use((req, res, next) => { req.setTimeout(0) // 设置为0表示永不超时 res.setTimeout(0) // 设置为0表示永不超时 next() }) this.app.use(cors(corsOptions)) this.app.use(express.json()) this.app.get('/', async (_, res) => { formatLog(LogLevel.INFO, 'New SSE connection request received.') const composer = new McpServerComposer(this.baseServerInfo) const server = composer.server const transport = new ExpressSSEServerTransport('/sessions') if (this.parsedConfig) { await this._applyConfigurationToComposer( composer, server, this.parsedConfig.configureMcp ) } server.connect(transport) transport.onclose = () => { formatLog( LogLevel.INFO, `SSE transport for session ${transport.sessionId} closed.` ) const sessionData = this.sseSessions.get(transport.sessionId) if (sessionData) { formatLog( LogLevel.INFO, `Closing McpServer and cleaning up resources for session ${transport.sessionId}.` ) sessionData.server.close() this.sseSessions.delete(transport.sessionId) formatLog( LogLevel.INFO, `Session ${transport.sessionId} fully cleaned up.` ) } else { formatLog( LogLevel.INFO, `onclose called for session ${transport.sessionId}, but session not found in map.` ) } } this.sseSessions.set(transport.sessionId, { composer, server, transport }) formatLog( LogLevel.INFO, `Session ${transport.sessionId} opened and McpServer instance created.` ) transport.handleSSERequest(res) }) this.app.post('/sessions', (req, res) => { const sessionId = req.query.sessionId as string const sessionData = this.sseSessions.get(sessionId) if (!sessionData) { formatLog(LogLevel.INFO, `Invalid or unknown session ID: ${sessionId}`) res.status(404).send('Session not found or invalid session ID') return } sessionData.transport.handlePostMessage(req, res) }) this.app.use((_, res) => { res.status(404).send('Not found') }) this.app.use( (err: Error, _, res: express.Response, next: express.NextFunction) => { formatLog( LogLevel.ERROR, `Unhandled error: ${err.message} ${err.stack ?? ''}` ) res.status(500).send('Internal server error') next() } ) } parseConfig(config: any) { const mcpServers = config?.mcpServers || {} const targetServers: McpServerType[] = [] for (const serverName in mcpServers) { const serverConfig = mcpServers[serverName] const targetServer: McpServerType = { name: serverName, type: serverConfig.url ? 'sse' : 'stdio', url: serverConfig.url, params: serverConfig.url ? {} : { ...serverConfig } } targetServers.push(targetServer) } return targetServers } private async _applyConfigurationToComposer( composer: McpServerComposer, server: McpServer, configureMcp: Function | null ) { if (!this.parsedConfig) { formatLog( LogLevel.DEBUG, 'No parsed config available to apply to composer.' ) return } composer.namespace = this.parsedConfig.namespace if (typeof configureMcp === 'function') { // @ts-ignore z._fieldsToZodSchema = fieldsToZodSchema configureMcp(server, ResourceTemplate, z) } for (const targetServer of this.parsedConfig.targetServers) { let mcpClientConfig if (targetServer.type === 'sse') { mcpClientConfig = { name: targetServer.name, type: 'sse', url: new URL(targetServer.url!), params: targetServer.params, tools: targetServer.tools } } else { mcpClientConfig = { name: targetServer.name, type: 'stdio', params: targetServer.params, tools: targetServer.tools } } await composer.add(mcpClientConfig, { name: targetServer.name ?? (targetServer.url ? new URL(targetServer.url).hostname : 'stdio-server'), version: targetServer.version ?? '1.0.0', description: targetServer.description ?? '' }) } for (const toolChain of this.parsedConfig.toolChains) { composer.composeToolChain(toolChain) } const registeredTools = server['_registeredTools'] if (this.parsedConfig.toolsFilter.length > 0) { for (const name in registeredTools) { if (!this.parsedConfig.toolsFilter.includes(name)) { ; (registeredTools[name] as any).disable() } } } if (Array.isArray(this.parsedConfig.toolChains)) { for (const toolChain of this.parsedConfig.toolChains) { if (registeredTools[toolChain.name]) { ; (registeredTools[toolChain.name] as any).enable() } } } } public getActiveServer(): McpServer { if (this.transportType === 'stdio' && this.stdioServer) { return this.stdioServer } if (this.transportType === 'sse' && this.defaultSseServer) { return this.defaultSseServer } throw new Error('No active server available') } async importMcpConfig(config: any, configureMcp: Function | null) { const targetServers = this.parseConfig(config) const toolChains = config?.toolChains || [] const namespace = config.namespace || NAMESPACE_SEPARATOR const toolsFilter = config?.tools || [] this.parsedConfig = { targetServers, toolChains, toolsFilter, namespace, configureMcp } // 为默认SSE服务器应用配置 if ( this.transportType === 'sse' && this.defaultSseComposer && this.defaultSseServer ) { await this._applyConfigurationToComposer( this.defaultSseComposer, this.defaultSseServer, this.parsedConfig.configureMcp ) } // 为stdio服务器应用配置 if ( this.transportType === 'stdio' && this.stdioComposer && this.stdioServer ) { formatLog( LogLevel.INFO, 'Applying new configuration to existing stdio server instance.' ) await this._applyConfigurationToComposer( this.stdioComposer, this.stdioServer, this.parsedConfig.configureMcp ) } } async start() { await this.setupRoutes() if (this.transportType === 'stdio') { formatLog(LogLevel.INFO, 'Server running in stdio mode.') return } const port = this.serverOptions.port ?? 3003 const host = this.serverOptions.host ?? '0.0.0.0' const conceptualServerName = this.baseServerInfo.name || 'mcpSessionServer' this.httpServer = this.app.listen(port, host, () => { const hostAddress = host === '0.0.0.0' ? '127.0.0.1' : host const serverUrl = `http://${hostAddress}:${port}` const mcpConfigDisplay = { mcpServers: { [conceptualServerName]: { url: serverUrl, type: 'sse' } } } if (this.parsedConfig) { mcpConfigDisplay['routerConfiguration'] = { namespace: this.parsedConfig.namespace } } formatLog( LogLevel.INFO, `Conceptual MCP Server Config (new instance per SSE connection): ` ) formatLog( LogLevel.OUTPUT, JSON.stringify(mcpConfigDisplay, null, 2), LogCategory.SYSTEM ) if (this.serverOptions.cursorLink) { // 打开url openCursorLink( conceptualServerName, mcpConfigDisplay.mcpServers[conceptualServerName] ) } formatLog( LogLevel.INFO, `MCP Router (SSE) listening on ${serverUrl}. Send GET to / for new session, POST to /sessions?sessionId=... for messages.\n\n` ) }) this.httpServer.on('error', error => { formatLog(LogLevel.ERROR, `HTTP server error: ${error.message}`) throw error }) } async close(): Promise<void> { try { formatLog(LogLevel.INFO, 'Shutting down McpRouterServer...') if (this.transportType === 'sse') { // 关闭默认SSE服务器 if (this.defaultSseServer) { try { await this.defaultSseServer.close() this.defaultSseServer = null this.defaultSseComposer = null } catch (error) { formatLog( LogLevel.ERROR, `Error closing default SSE server: ${(error as Error).message}` ) } } const sseServerClosePromises = Array.from( this.sseSessions.values() ).map(async sessionData => { try { formatLog( LogLevel.INFO, `Closing McpServer for session ${sessionData.transport.sessionId}...` ) await sessionData.server.close() } catch (error) { formatLog( LogLevel.ERROR, `Error closing McpServer for session ${sessionData.transport.sessionId }: ${(error as Error).message}` ) } }) await Promise.all(sseServerClosePromises) if (this.sseSessions.size > 0) { formatLog( LogLevel.INFO, `${this.sseSessions.size} SSE sessions still in map after close attempts. Forcibly clearing.` ) this.sseSessions.clear() } } if (this.transportType === 'stdio' && this.stdioServer) { formatLog(LogLevel.INFO, 'Closing stdio McpServer...') try { await this.stdioTransport?.close() await this.stdioServer.close() } catch (error) { formatLog( LogLevel.ERROR, `Error closing stdio McpServer: ${(error as Error).message}` ) } this.stdioServer = null this.stdioComposer = null this.stdioTransport = null } if (this.httpServer) { formatLog(LogLevel.INFO, 'Closing HTTP server...') await new Promise<void>((resolve, reject) => { this.httpServer!.close(err => { if (err) { formatLog( LogLevel.ERROR, `Error closing HTTP server: ${err.message}` ) reject(err) } else { formatLog(LogLevel.INFO, 'HTTP server closed successfully.') this.httpServer = null resolve() } }) }) } formatLog(LogLevel.INFO, 'McpRouterServer shut down completely.') } catch (error) { formatLog( LogLevel.ERROR, `Critical error during McpRouterServer shutdown: ${(error as Error).message }` ) throw error } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shadowcz007/mcp_server_exe'

If you have feedback or need assistance with the MCP directory API, please join our Discord server