Skip to main content
Glama
evalstate

Hugging Face MCP Server

by evalstate
sse-transport.ts9.4 kB
import { StatefulTransport, type TransportOptions, type BaseSession } from './base-transport.js'; import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; import { logger } from '../utils/logger.js'; import type { Request, Response } from 'express'; import { JsonRpcErrors, extractJsonRpcId } from './json-rpc-errors.js'; import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { extractQueryParamsToHeaders } from '../utils/query-params.js'; import { logSystemEvent } from '../utils/query-logger.js'; interface SSEConnection extends BaseSession<SSEServerTransport> { cleanup: () => Promise<void>; } export class SseTransport extends StatefulTransport<SSEConnection> { async initialize(_options: TransportOptions): Promise<void> { // SSE endpoint for client connections this.app.get('/sse', (req: Request, res: Response) => { this.trackRequest(); void this.handleSseConnection(req, res); }); // Handle messages for all SSE sessions this.app.post('/message', (req: Request, res: Response) => { this.trackRequest(); void this.handleSseMessage(req, res); }); this.startStaleConnectionCheck(); this.startPingKeepAlive(); logger.info( { heartbeatInterval: this.HEARTBEAT_INTERVAL, staleCheckInterval: this.STALE_CHECK_INTERVAL, staleTimeout: this.STALE_TIMEOUT, pingEnabled: this.PING_ENABLED, pingInterval: this.PING_INTERVAL, }, 'SSE transport routes initialized' ); return Promise.resolve(); } private async handleSseConnection(req: Request, res: Response): Promise<void> { try { // Reject new connections during shutdown if (this.isShuttingDown) { logger.warn('Rejecting SSE connection during shutdown'); this.trackError(503); res.status(503).json(JsonRpcErrors.serverShuttingDown()); return; } const existingSessionId = req.query.sessionId as string | undefined; // Handle reconnection attempts if (existingSessionId) { const existing = this.sessions.get(existingSessionId); if (existing) { logger.warn( { sessionId: existingSessionId, age: Date.now() - existing.metadata.connectedAt.getTime(), }, 'Client attempting to reconnect with existing sessionId' ); // Clean up old connection before creating new one await this.closeConnection(existingSessionId); } else { // Session not found - track failed resumption this.metrics.trackSessionResumeFailed(); logger.debug({ sessionId: existingSessionId }, 'Session not found for reconnection'); } } // Create server instance using factory with request headers and query params const headers = req.headers as Record<string, string>; extractQueryParamsToHeaders(req, headers); // Extract IP address for tracking const ipAddress = this.extractIpAddress(req.headers, req.ip); this.trackIpAddress(ipAddress); // Validate auth and track metrics const authResult = await this.validateAuthAndTrackMetrics(headers); if (!authResult.shouldContinue) { this.trackError(authResult.statusCode); res.status(authResult.statusCode || 401).send('Unauthorized'); return; } // Create new transport const transport = new SSEServerTransport('/message', res); const sessionId = transport.sessionId; // Create server with session info (client info will be updated after initialization) const isAuthenticated = authResult.shouldContinue && !!headers['authorization']; const result = await this.serverFactory(headers, undefined, false, { clientSessionId: sessionId, isAuthenticated, }); const server = result.server; logger.info({ sessionId }, 'New SSE connection established'); // Log system initialize event const requestBody = req.body as { params?: { capabilities?: unknown } } | undefined; logSystemEvent('initialize', sessionId, { clientSessionId: sessionId, isAuthenticated, requestJson: req.body ?? {}, capabilities: requestBody?.params?.capabilities, ipAddress, }); // Create comprehensive cleanup function const cleanup = this.createCleanupFunction(sessionId); // Store connection with metadata const authToken = headers['authorization']?.replace(/^Bearer\s+/i, ''); const connection: SSEConnection = { transport, server, cleanup, metadata: { id: sessionId, connectedAt: new Date(), lastActivity: new Date(), requestCount: 0, isAuthenticated: authResult.shouldContinue && !!headers['authorization'], capabilities: {}, ipAddress, authToken, }, cleaningUp: false, }; this.sessions.set(sessionId, connection); // Track the session creation for metrics this.trackSessionCreated(sessionId); // Set up heartbeat and connection event handlers this.startHeartbeat(sessionId, res); this.setupSseEventHandlers(sessionId, res); // Connect to server with proper cleanup handling await this.connectWithCleanup(transport, server, sessionId, cleanup); logger.debug({ sessionId }, 'SSE transport fully initialized'); } catch (error) { logger.error({ error }, 'Error establishing SSE connection'); this.trackError(500, error instanceof Error ? error : new Error(String(error))); if (!res.headersSent) { res.status(500).json(JsonRpcErrors.internalError(null, 'Internal server error establishing SSE connection')); } } } private async handleSseMessage(req: Request, res: Response): Promise<void> { const trackingName = this.extractMethodForTracking(req.body); try { const sessionId = req.query.sessionId as string; if (!sessionId) { logger.warn('SSE message received without sessionId'); this.trackError(400); // Track method call without timing (stateful mode measures HTTP dispatch time, not MCP processing time) this.metrics.trackMethod(trackingName, undefined, true); res.status(400).json(JsonRpcErrors.invalidParams('sessionId is required', extractJsonRpcId(req.body))); return; } const connection = this.sessions.get(sessionId); if (!connection) { logger.warn({ sessionId }, 'SSE message for unknown session'); this.trackError(404); this.metrics.trackMethod(trackingName, undefined, true); res.status(404).json(JsonRpcErrors.sessionNotFound(sessionId, extractJsonRpcId(req.body))); return; } // Update last activity using base class helper this.updateSessionActivity(sessionId); // Increment request count only for actual method calls (not responses or pings) if (trackingName) { const session = this.sessions.get(sessionId); if (session) { session.metadata.requestCount++; } } // Handle message with the transport await connection.transport.handlePostMessage(req, res, req.body); // Track successful method call without timing (stateful mode measures HTTP dispatch time, not MCP processing time) this.metrics.trackMethod(trackingName, undefined, false); logger.debug({ sessionId }, 'SSE message handled successfully'); } catch (error) { logger.error({ error }, 'Error handling SSE message'); this.trackError(500, error instanceof Error ? error : new Error(String(error))); this.metrics.trackMethod(trackingName, undefined, true); if (!res.headersSent) { res .status(500) .json(JsonRpcErrors.internalError(extractJsonRpcId(req.body), 'Internal server error handling SSE message')); } } } private createCleanupFunction(sessionId: string): () => Promise<void> { return () => this.cleanupSession(sessionId); } private async connectWithCleanup( transport: SSEServerTransport, server: McpServer, sessionId: string, cleanup: () => Promise<void> ): Promise<void> { try { // Set up standard server configuration this.setupServerForSession(server, sessionId); await server.connect(transport); } catch (error) { logger.error({ error, sessionId }, 'Failed to connect transport to server'); this.trackError(500, error instanceof Error ? error : new Error(String(error))); await cleanup(); throw error; } } /** * Remove a stale session - implementation for StatefulTransport */ protected async removeStaleSession(sessionId: string): Promise<void> { logger.info({ sessionId }, 'Removing stale SSE connection'); await this.cleanupSession(sessionId); // Log system session delete event logSystemEvent('session_delete', sessionId, { clientSessionId: sessionId, }); } async cleanup(): Promise<void> { logger.info( { activeConnections: this.sessions.size, }, 'Starting SSE transport cleanup' ); // Stop stale checker using base class helper this.stopStaleConnectionCheck(); // Use base class cleanup method await this.cleanupAllSessions(); logger.info('SSE transport cleanup completed'); } /** * Force close a specific connection */ async closeConnection(sessionId: string): Promise<boolean> { const session = this.sessions.get(sessionId); if (!session) { logger.debug({ sessionId }, 'Attempted to close non-existent connection'); return false; } try { await this.cleanupSession(sessionId); logSystemEvent('session_delete', sessionId, { clientSessionId: sessionId, }); return true; } catch (error) { logger.error({ error, sessionId }, 'Error closing connection'); return false; } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/evalstate/hf-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server