Skip to main content
Glama
server.ts30.6 kB
import express, { Application, Request, Response, NextFunction } from 'express'; import cors from 'cors'; import helmet from 'helmet'; import rateLimit from 'express-rate-limit'; import http from 'http'; import { WebSocketServer, WebSocket } from 'ws'; import { MemAgent } from '@core/brain/memAgent/index.js'; import { logger } from '@core/logger/index.js'; import { errorResponse, successResponse, ERROR_CODES } from './utils/response.js'; import { requestIdMiddleware, requestLoggingMiddleware, errorLoggingMiddleware, } from './middleware/logging.js'; import { Server as McpServer } from '@modelcontextprotocol/sdk/server/index.js'; import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; import { initializeMcpServer, initializeAgentCardResource } from '@app/mcp/mcp_handler.js'; // Import WebSocket components import { WebSocketConnectionManager } from './websocket/connection-manager.js'; import { WebSocketMessageRouter } from './websocket/message-router.js'; import { WebSocketEventSubscriber } from './websocket/event-subscriber.js'; import { WebSocketMessage, WebSocketConfig } from './websocket/types.js'; // Import route handlers import { createMessageRoutes } from './routes/message.js'; import { createSessionRoutes } from './routes/session.js'; import { createMcpRoutes } from './routes/mcp.js'; import { createConfigRoutes } from './routes/config.js'; import { createLlmRoutes } from './routes/llm.js'; import { createSearchRoutes } from './routes/search.js'; import { createWebhookRoutes } from './routes/webhook.js'; export interface ApiServerConfig { port: number; host?: string; corsOrigins?: string[]; rateLimitWindowMs?: number; rateLimitMaxRequests?: number; mcpTransportType?: 'stdio' | 'sse' | 'http'; mcpPort?: number; // WebSocket configuration enableWebSocket?: boolean; webSocketConfig?: WebSocketConfig; // API prefix configuration apiPrefix?: string; } export class ApiServer { private app: Application; private agent: MemAgent; private config: ApiServerConfig; private apiPrefix: string; private mcpServer?: McpServer; private activeMcpSseTransports: Map<string, SSEServerTransport> = new Map(); // WebSocket components private httpServer?: http.Server; private wss?: WebSocketServer; private wsConnectionManager?: WebSocketConnectionManager; private wsMessageRouter?: WebSocketMessageRouter; private wsEventSubscriber?: WebSocketEventSubscriber; private heartbeatInterval?: NodeJS.Timeout; constructor(agent: MemAgent, config: ApiServerConfig) { this.agent = agent; this.config = config; // Validate and set API prefix this.apiPrefix = this.validateAndNormalizeApiPrefix(config.apiPrefix); this.app = express(); this.setupMiddleware(); this.setupRoutes(); this.setupErrorHandling(); // Note: MCP setup is now handled in start() method to properly handle async operations } /** * Validate and normalize API prefix configuration */ private validateAndNormalizeApiPrefix(prefix?: string): string { // Default to '/api' for backward compatibility if (prefix === undefined) { return '/api'; } // Allow empty string to disable prefix if (prefix === '') { return ''; } // Validate prefix format if (typeof prefix !== 'string') { throw new Error('API prefix must be a string'); } // Ensure prefix starts with '/' if not empty if (!prefix.startsWith('/')) { prefix = '/' + prefix; } // Remove trailing slash to normalize if (prefix.endsWith('/') && prefix !== '/') { prefix = prefix.slice(0, -1); } logger.info(`[API Server] Using API prefix: '${prefix || '(none)'}'`); return prefix; } /** * Helper method to construct API route paths */ private buildApiRoute(route: string): string { if (!this.apiPrefix || this.apiPrefix === '') { return route; } return `${this.apiPrefix}${route}`; } /** * Helper method to construct full path including proxy context path * Used for SSE transport endpoint configuration when behind reverse proxy */ private buildFullPath(req: Request, path: string): string { const contextPath = (req as any).contextPath || ''; const fullPath = contextPath + this.buildApiRoute(path); logger.debug('[API Server] Built full path', { path, contextPath, apiPrefix: this.apiPrefix, fullPath, }); return fullPath; } private async setupMcpServer( transportType: 'stdio' | 'sse' | 'http', _port?: number ): Promise<void> { logger.info(`[API Server] Setting up MCP server with transport type: ${transportType}`); try { // Initialize agent card data const agentCard = this.agent.getEffectiveConfig().agentCard; const agentCardInput = agentCard ? Object.fromEntries(Object.entries(agentCard).filter(([, value]) => value !== undefined)) : {}; const agentCardData = initializeAgentCardResource(agentCardInput); // Check MCP_SERVER_MODE environment variable to determine server type const mcpServerMode = process.env.MCP_SERVER_MODE || 'default'; logger.info(`[API Server] MCP server mode: ${mcpServerMode}`); // Load aggregator configuration if needed let aggregatorConfig: any = undefined; if (mcpServerMode === 'aggregator') { aggregatorConfig = await this.loadAggregatorConfig(); } // Initialize MCP server instance with appropriate mode this.mcpServer = await initializeMcpServer( this.agent, agentCardData, mcpServerMode as 'default' | 'aggregator', aggregatorConfig ); if (transportType === 'sse') { this.setupMcpSseRoutes(); // Renamed for clarity as it sets up both GET and POST } else if (transportType === 'stdio') { // For stdio, we need to explicitly connect the server to a StdioServerTransport // This usually means the process runs as a standalone MCP server, not integrated into Express // This case is typically handled by the CLI directly, not via ApiServer logger.warn( `[API Server] MCP transport type 'stdio' is typically handled by the CLI directly and not integrated into the API server.` ); // If a StdioServerTransport needs to be managed by ApiServer, its lifecycle would be handled here. // For now, we assume 'stdio' mode implies a different execution path. } else { logger.warn( `[API Server] MCP transport type '${transportType}' not fully implemented for API server integration yet.` ); } } catch (error) { logger.error( `[API Server] Failed to set up MCP server: ${error instanceof Error ? error.message : String(error)}` ); } } /** * Load aggregator configuration from environment variables * Aggregator mode now uses agent's unifiedToolManager which automatically includes MCP servers from cipher.yml */ private async loadAggregatorConfig(): Promise<any> { const defaultConfig = { type: 'aggregator' as const, servers: {}, // No longer needed - using unifiedToolManager conflictResolution: (process.env.AGGREGATOR_CONFLICT_RESOLUTION as any) || 'prefix', autoDiscovery: false, timeout: parseInt(process.env.AGGREGATOR_TIMEOUT || '60000'), connectionMode: 'lenient' as const, }; logger.info('[API Server] Using aggregator configuration with env vars', { conflictResolution: defaultConfig.conflictResolution, timeout: defaultConfig.timeout, }); return defaultConfig; } private setupMcpSseRoutes(): void { if (!this.mcpServer) { logger.error('[API Server] MCP Server not initialized for SSE route setup.'); return; } // Handle SSE GET endpoint (for client to establish SSE connection) this.app.get(this.buildApiRoute('/mcp/sse'), (req: Request, res: Response) => { logger.info('[API Server] New MCP SSE client attempting connection.'); logger.debug('[API Server] SSE Request Headers:', req.headers); logger.debug('[API Server] SSE Request URL:', req.url); logger.debug('[API Server] SSE Request Original URL:', req.originalUrl); try { // Build the POST endpoint path including context path from proxy const postEndpoint = this.buildFullPath(req, '/mcp'); logger.info(`[API Server] Creating SSE transport with POST endpoint: ${postEndpoint}`); // Create SSE transport instance with the full path const sseTransport = new SSEServerTransport(postEndpoint, res); // Log the session ID for debugging logger.info(`[API Server] SSE session created with ID: ${sseTransport.sessionId}`); logger.info( `[API Server] Client should POST to: ${postEndpoint}?sessionId=${sseTransport.sessionId}` ); // Connect MCP server to this SSE transport (this will call sseTransport.start() and set headers) this.mcpServer?.connect(sseTransport); logger.debug('[API Server] MCP server connected to SSE transport'); // Store the transport keyed by its session ID this.activeMcpSseTransports.set(sseTransport.sessionId, sseTransport); logger.info( `[API Server] MCP SSE client connected with session ID: ${sseTransport.sessionId}` ); logger.debug( `[API Server] Active SSE transports count: ${this.activeMcpSseTransports.size}` ); // Handle client disconnect req.on('close', () => { logger.info( `[API Server] MCP SSE client with session ID ${sseTransport.sessionId} disconnected.` ); logger.debug('[API Server] Cleaning up SSE transport and connection'); sseTransport.close(); // Close the transport when client disconnects this.activeMcpSseTransports.delete(sseTransport.sessionId); // Remove from active transports logger.debug( `[API Server] Active SSE transports count after cleanup: ${this.activeMcpSseTransports.size}` ); }); // Handle transport errors sseTransport.onerror = (error: unknown) => { logger.error('[API Server] SSE transport error:', error); this.activeMcpSseTransports.delete(sseTransport.sessionId); }; } catch (error) { logger.error('[API Server] Error setting up SSE transport:', error); if (!res.headersSent) { res.status(500).json({ error: 'Failed to establish SSE connection', message: error instanceof Error ? error.message : String(error), }); } } }); // Handle POST requests for MCP messages over HTTP (part of Streamable HTTP) this.app.post(this.buildApiRoute('/mcp'), async (req: Request, res: Response) => { logger.debug('[API Server] MCP POST request received'); logger.debug('[API Server] POST Request Headers:', req.headers); logger.debug('[API Server] POST Request Body:', req.body); logger.debug('[API Server] POST Request Query:', req.query); logger.debug('[API Server] POST Request Original URL:', req.originalUrl); if (!this.mcpServer) { logger.error('[API Server] MCP Server not initialized for POST route.'); return errorResponse(res, ERROR_CODES.INTERNAL_ERROR, 'MCP Server not ready', 500); } // Try multiple ways to get session ID (query param, header, body) const sessionId = (req.query.sessionId as string) || (req.headers['x-session-id'] as string) || (req.body.sessionId as string); if (!sessionId) { logger.error('[API Server] MCP POST request received without session ID.'); logger.debug('[API Server] Available query parameters:', Object.keys(req.query)); logger.debug('[API Server] Available headers:', Object.keys(req.headers)); logger.debug( '[API Server] Active sessions:', Array.from(this.activeMcpSseTransports.keys()) ); return errorResponse( res, ERROR_CODES.BAD_REQUEST, 'Missing sessionId in query parameters, headers, or body. Session ID is required for security.', 400 ); } logger.debug(`[API Server] Looking for SSE transport with session ID: ${sessionId}`); logger.debug( `[API Server] Available session IDs: ${Array.from(this.activeMcpSseTransports.keys()).join(', ')}` ); const sseTransport = this.activeMcpSseTransports.get(sessionId); if (!sseTransport) { logger.error(`[API Server] No active MCP SSE transport found for session ID: ${sessionId}`); return errorResponse( res, ERROR_CODES.NOT_FOUND, `No active session found for ID: ${sessionId}`, 404 ); } try { logger.debug( `[API Server] Delegating POST message to SSE transport for session: ${sessionId}` ); // Delegate handling of the POST message to the specific SSEServerTransport instance // Pass req.body as the third parameter since Express has already parsed it await sseTransport.handlePostMessage(req, res, req.body); logger.debug(`[API Server] POST message handled successfully for session: ${sessionId}`); } catch (error) { logger.error( `[API Server] Error handling MCP POST request for session ${sessionId}: ${error instanceof Error ? error.message : String(error)}` ); logger.debug( `[API Server] POST error stack:`, error instanceof Error ? error.stack : 'No stack available' ); errorResponse( res, ERROR_CODES.INTERNAL_ERROR, 'Error processing MCP request', 500, error instanceof Error ? error.stack : undefined ); } }); const mcpSseRoute = this.buildApiRoute('/mcp/sse'); const mcpPostRoute = this.buildApiRoute('/mcp'); logger.info( `[API Server] MCP SSE (GET ${mcpSseRoute}) and POST (${mcpPostRoute}?sessionId=...) routes registered.` ); } /** * Set up WebSocket server and event handling */ private setupWebSocket(): void { if (!this.config.enableWebSocket || !this.httpServer) { logger.debug('[API Server] WebSocket disabled or HTTP server not available'); return; } const wsConfig = this.config.webSocketConfig || {}; // Create WebSocket server this.wss = new WebSocketServer({ server: this.httpServer, path: wsConfig.path || '/ws', ...(wsConfig.maxConnections && { maxClients: wsConfig.maxConnections }), ...(wsConfig.enableCompression !== undefined && { perMessageDeflate: wsConfig.enableCompression, }), }); // Initialize WebSocket components this.wsConnectionManager = new WebSocketConnectionManager( wsConfig.maxConnections || 1000, wsConfig.connectionTimeout || 300000 ); this.wsMessageRouter = new WebSocketMessageRouter(this.agent, this.wsConnectionManager); this.wsEventSubscriber = new WebSocketEventSubscriber( this.wsConnectionManager, this.agent.services.eventManager ); // Wire up the connection manager to notify the event subscriber this.wsConnectionManager.setEventSubscriber(this.wsEventSubscriber); // Set up WebSocket connection handler this.wss.on('connection', (ws: WebSocket, request) => { this.handleWebSocketConnection(ws, request); }); // Start event subscription this.wsEventSubscriber.subscribe(); // Set up heartbeat if configured if (wsConfig.heartbeatInterval && wsConfig.heartbeatInterval > 0) { this.heartbeatInterval = setInterval(() => { this.wsConnectionManager?.sendHeartbeat(); }, wsConfig.heartbeatInterval); } // Set up graceful shutdown handling process.on('SIGTERM', () => { this.shutdownWebSocket(); }); process.on('SIGINT', () => { this.shutdownWebSocket(); }); logger.info('[API Server] WebSocket server initialized', { path: wsConfig.path || '/ws', maxConnections: wsConfig.maxConnections || 1000, compression: wsConfig.enableCompression !== false, heartbeat: wsConfig.heartbeatInterval || 'disabled', }); } /** * Handle new WebSocket connection */ private handleWebSocketConnection(ws: WebSocket, request: http.IncomingMessage): void { try { // Extract session ID from query parameters if provided const url = new URL(request.url || '', `http://${request.headers.host}`); const sessionId = url.searchParams.get('sessionId') || undefined; // Add connection to manager const connectionId = this.wsConnectionManager!.addConnection(ws, sessionId); logger.info('[API Server] New WebSocket connection established', { connectionId, sessionId, origin: request.headers.origin, userAgent: request.headers['user-agent'], }); // Set up message handler ws.on('message', async (data: Buffer) => { try { const message = JSON.parse(data.toString()) as WebSocketMessage; await this.wsMessageRouter!.routeMessage(ws, connectionId, message); } catch (error) { logger.error('[API Server] Error parsing WebSocket message', { connectionId, error: error instanceof Error ? error.message : String(error), rawData: data.toString().substring(0, 200), // Log first 200 chars }); // Send error response if (ws.readyState === WebSocket.OPEN) { ws.send( JSON.stringify({ event: 'error', error: 'Invalid message format', data: { message: 'Failed to parse JSON message', code: 'INVALID_JSON', }, timestamp: Date.now(), }) ); } } }); // Send welcome message if (ws.readyState === WebSocket.OPEN) { ws.send( JSON.stringify({ event: 'connected', data: { connectionId, sessionId, serverVersion: process.env.npm_package_version || 'unknown', capabilities: ['streaming', 'tools', 'memory', 'reset'], }, timestamp: Date.now(), }) ); } } catch (error) { logger.error('[API Server] Error handling WebSocket connection', { error: error instanceof Error ? error.message : String(error), origin: request.headers.origin, }); if (ws.readyState === WebSocket.OPEN) { ws.close(1011, 'Server error during connection setup'); } } } /** * Shutdown WebSocket server gracefully */ private shutdownWebSocket(): void { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); } if (this.wsEventSubscriber) { this.wsEventSubscriber.dispose(); } if (this.wsConnectionManager) { this.wsConnectionManager.dispose(); } if (this.wss) { this.wss.close(() => { logger.info('[API Server] WebSocket server closed'); }); } } private setupMiddleware(): void { // Enable trust proxy for reverse proxy support this.app.set('trust proxy', true); // Parse X-Forwarded-Prefix for context path support this.app.use((req: Request, res: Response, next: NextFunction) => { // Get the prefix from X-Forwarded-Prefix header or environment variable const forwardedPrefix = req.headers['x-forwarded-prefix'] as string; const envPrefix = process.env.PROXY_CONTEXT_PATH; const contextPath = forwardedPrefix || envPrefix || ''; // Store context path on request for later use (req as any).contextPath = contextPath; logger.debug('[API Server] Request context', { originalUrl: req.originalUrl, contextPath, forwardedPrefix, forwardedProto: req.headers['x-forwarded-proto'], forwardedHost: req.headers['x-forwarded-host'], }); next(); }); // Security middleware this.app.use( helmet({ contentSecurityPolicy: false, // Disable CSP for API crossOriginEmbedderPolicy: false, }) ); // CORS configuration - enhanced for reverse proxy support this.app.use( cors({ origin: (origin, callback) => { const allowedOrigins = this.config.corsOrigins || ['http://localhost:3000']; // Allow requests with no origin (e.g., mobile apps, curl, Postman) if (!origin) { callback(null, true); return; } // Check if origin is in the allowed list if (allowedOrigins.includes(origin)) { callback(null, true); return; } // Additional lenient check for development environments only // Allow localhost and 127.0.0.1 with any port in development if (process.env.NODE_ENV !== 'production') { const originUrl = new URL(origin); if ( originUrl.hostname === 'localhost' || originUrl.hostname === '127.0.0.1' || originUrl.hostname === '::1' ) { callback(null, true); return; } } // Reject all other origins callback(new Error('Not allowed by CORS')); }, methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], allowedHeaders: ['Content-Type', 'Authorization', 'X-Request-ID', 'X-Session-ID'], credentials: true, }) ); // Rate limiting const limiter = rateLimit({ windowMs: this.config.rateLimitWindowMs || 15 * 60 * 1000, // 15 minutes max: this.config.rateLimitMaxRequests || 100, // limit each IP to 100 requests per windowMs message: { success: false, error: { code: ERROR_CODES.RATE_LIMIT_EXCEEDED, message: 'Too many requests from this IP, please try again later.', }, }, standardHeaders: true, legacyHeaders: false, }); // Apply rate limiting to API routes if prefix is configured if (this.apiPrefix) { this.app.use(`${this.apiPrefix}/`, limiter); } // Body parsing middleware this.app.use(express.json({ limit: '10mb' })); // Support for image data this.app.use(express.urlencoded({ extended: true })); // Custom middleware this.app.use(requestIdMiddleware); this.app.use(requestLoggingMiddleware); } private setupRoutes(): void { // Health check endpoint this.app.get('/health', (_req: Request, res: Response) => { const healthData: any = { status: 'healthy', timestamp: new Date().toISOString(), uptime: process.uptime(), version: process.env.npm_package_version || 'unknown', }; // Add WebSocket health if enabled if (this.config.enableWebSocket) { healthData.websocket = { enabled: true, active: this.isWebSocketActive(), stats: this.getWebSocketStats(), }; } res.json(healthData); }); // WebSocket stats endpoint this.app.get('/ws/stats', (_req: Request, res: Response) => { if (!this.config.enableWebSocket) { return res.status(404).json({ success: false, error: { code: 'WEBSOCKET_DISABLED', message: 'WebSocket is not enabled', }, }); } const stats = this.getWebSocketStats(); return res.json({ success: true, data: { enabled: true, active: this.isWebSocketActive(), ...stats, }, }); }); // API routes this.app.use(this.buildApiRoute('/message'), createMessageRoutes(this.agent)); this.app.use(this.buildApiRoute('/sessions'), createSessionRoutes(this.agent)); this.app.use(this.buildApiRoute('/mcp'), createMcpRoutes(this.agent)); this.app.use(this.buildApiRoute('/llm'), createLlmRoutes(this.agent)); this.app.use(this.buildApiRoute('/config'), createConfigRoutes(this.agent)); this.app.use(this.buildApiRoute('/search'), createSearchRoutes(this.agent)); this.app.use(this.buildApiRoute('/webhooks'), createWebhookRoutes(this.agent)); // Legacy endpoint for MCP server connection this.app.post(this.buildApiRoute('/connect-server'), (req: Request, res: Response) => { // Forward to MCP routes req.url = '/servers'; createMcpRoutes(this.agent)(req, res, () => {}); }); // Chrome DevTools compatibility endpoint (prevents 404 errors in console) this.app.get( '/.well-known/appspecific/com.chrome.devtools.json', (req: Request, res: Response) => { res.status(204).end(); // No Content - indicates no DevTools integration available } ); // A2A (Agent-to-Agent) discovery endpoint this.app.get('/.well-known/agent.json', (req: Request, res: Response) => { try { const agentCard = { name: 'Cipher Agent', description: 'Memory-powered AI agent framework with real-time communication', version: process.env.npm_package_version || '1.0.0', capabilities: ['conversation', 'memory', 'tools', 'mcp', 'websocket', 'streaming'], endpoints: { base: `${req.protocol}://${req.get('host')}`, api: `${req.protocol}://${req.get('host')}${this.apiPrefix || ''}`, websocket: `ws://${req.get('host')}/ws`, health: `${req.protocol}://${req.get('host')}/health`, }, contact: { support: 'https://github.com/byterover/cipher', }, protocols: ['http', 'websocket', 'mcp'], timestamp: new Date().toISOString(), }; res.json(agentCard); } catch (error) { logger.error('Failed to generate agent card', { requestId: req.requestId, error: error instanceof Error ? error.message : String(error), }); errorResponse( res, ERROR_CODES.INTERNAL_ERROR, 'Failed to generate agent discovery data', 500, undefined, req.requestId ); } }); // Global reset endpoint this.app.post(this.buildApiRoute('/reset'), async (req: Request, res: Response) => { try { const { sessionId } = req.body; logger.info('Processing global reset request', { requestId: req.requestId, sessionId: sessionId || 'all', }); if (sessionId) { // Reset specific session const success = await this.agent.removeSession(sessionId); if (!success) { return errorResponse( res, ERROR_CODES.SESSION_NOT_FOUND, `Session ${sessionId} not found`, 404, undefined, req.requestId ); } } else { // Reset all sessions const sessionIds = await this.agent.listSessions(); for (const id of sessionIds) { await this.agent.removeSession(id); } } successResponse( res, { message: sessionId ? `Session ${sessionId} reset` : 'All sessions reset', timestamp: new Date().toISOString(), }, 200, req.requestId ); } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error); logger.error('Global reset failed', { requestId: req.requestId, error: errorMsg, }); errorResponse( res, ERROR_CODES.INTERNAL_ERROR, `Reset failed: ${errorMsg}`, 500, process.env.NODE_ENV === 'development' ? error : undefined, req.requestId ); } }); // Note: 404 handler moved to setup404Handler() and called after MCP routes setup } private setup404Handler(): void { // 404 handler for unknown routes - must be registered AFTER all other routes this.app.use((req: Request, res: Response) => { errorResponse( res, ERROR_CODES.NOT_FOUND, `Route ${req.method} ${req.originalUrl} not found`, 404, undefined, req.requestId ); }); } private setupErrorHandling(): void { // Error logging middleware this.app.use(errorLoggingMiddleware); // Global error handler this.app.use((err: Error, req: Request, res: Response, next: NextFunction) => { // If response already sent, delegate to default Express error handler if (res.headersSent) { return next(err); } // Determine error type and status code let statusCode = 500; let errorCode: string = ERROR_CODES.INTERNAL_ERROR; if (err.name === 'ValidationError') { statusCode = 400; errorCode = ERROR_CODES.VALIDATION_ERROR; } else if (err.name === 'UnauthorizedError') { statusCode = 401; errorCode = ERROR_CODES.UNAUTHORIZED; } errorResponse( res, errorCode, err.message || 'An unexpected error occurred', statusCode, process.env.NODE_ENV === 'development' ? err.stack : undefined, req.requestId ); }); } public async start(): Promise<void> { // Set up MCP server BEFORE starting HTTP server if transport type is provided if (this.config.mcpTransportType) { try { await this.setupMcpServer(this.config.mcpTransportType, this.config.mcpPort); logger.info(`[API Server] MCP server setup completed successfully`); } catch (error) { logger.error( `[API Server] Failed to setup MCP server: ${error instanceof Error ? error.message : String(error)}` ); throw error; } } // Set up 404 handler AFTER all routes (including MCP) are registered this.setup404Handler(); return new Promise((resolve, reject) => { try { // Create HTTP server from Express app this.httpServer = http.createServer(this.app); // Set up WebSocket server if enabled if (this.config.enableWebSocket) { this.setupWebSocket(); } this.httpServer.listen(this.config.port, this.config.host || 'localhost', () => { logger.info( `API Server started on ${this.config.host || 'localhost'}:${this.config.port}`, null, 'green' ); if (this.config.mcpTransportType) { const mcpSseEndpoint = this.buildApiRoute('/mcp/sse'); const mcpEndpoint = this.buildApiRoute('/mcp'); logger.info( `[API Server] MCP SSE endpoints available at ${mcpSseEndpoint} and ${mcpEndpoint}`, null, 'green' ); } if (this.config.enableWebSocket) { const wsPath = this.config.webSocketConfig?.path || '/ws'; logger.info( `[API Server] WebSocket server available at ws://${this.config.host || 'localhost'}:${this.config.port}${wsPath}`, null, 'green' ); } resolve(); }); this.httpServer.on('error', err => { const errorMessage = err.message || err.toString() || 'Unknown error'; logger.error('Failed to start API server:', errorMessage); logger.error('Error details:', err); reject(err); }); // Graceful shutdown process.on('SIGTERM', () => { logger.info('SIGTERM received, shutting down API server gracefully'); this.shutdownWebSocket(); this.httpServer?.close(() => { logger.info('API server stopped'); process.exit(0); }); }); process.on('SIGINT', () => { logger.info('SIGINT received, shutting down API server gracefully'); this.shutdownWebSocket(); this.httpServer?.close(() => { logger.info('API server stopped'); process.exit(0); }); }); } catch (error) { reject(error); } }); } public getApp(): Application { return this.app; } /** * Get WebSocket statistics */ public getWebSocketStats(): any { if (!this.wsConnectionManager || !this.wsEventSubscriber) { return null; } return { connections: this.wsConnectionManager.getStats(), events: this.wsEventSubscriber.getStats(), router: this.wsMessageRouter?.getStats(), }; } /** * Send system message to all WebSocket connections */ public broadcastSystemMessage( message: string, level: 'info' | 'warning' | 'error' = 'info' ): void { if (this.wsEventSubscriber) { this.wsEventSubscriber.sendSystemMessage(message, level); } } /** * Check if WebSocket is enabled and active */ public isWebSocketActive(): boolean { return !!(this.config.enableWebSocket && this.wss && this.wsEventSubscriber?.isActive()); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/campfirein/cipher'

If you have feedback or need assistance with the MCP directory API, please join our Discord server