Skip to main content
Glama

Carbon Voice

by PhononX
streamable.ts14.9 kB
#!/usr/bin/env node import fs from 'fs'; import path from 'path'; import cors from 'cors'; import express, { NextFunction, Response } from 'express'; import helmet from 'helmet'; import serveFavicon from 'serve-favicon'; import { requireBearerAuth } from '@modelcontextprotocol/sdk/server/auth/middleware/bearerAuth.js'; import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { isInitializeRequest, LATEST_PROTOCOL_VERSION, } from '@modelcontextprotocol/sdk/types.js'; import { addMcpSessionId, addRequestIdMiddleware, authErrorLogger, logRequest, oauthAuthorizationServer, oauthProtectedResource, rateLimitMiddleware, wellKnownCorsHeaders, } from '.'; import { REQUIRED_SCOPES } from './constants'; import { ApiHealthStatus } from './interfaces'; import { Session, SessionCleanupService, SessionLogger, sessionService, } from './session'; import { SessionConfig } from './session/session.config'; import { getOrCreateSessionId } from './utils'; import { createOAuthTokenVerifier } from '../../auth'; import { AuthenticatedRequest } from '../../auth/interfaces'; import { env, isTestEnvironment } from '../../config'; import { getCarbonVoiceApiStatus } from '../../cv-api'; import server from '../../server'; import { getProcessUptime, logger } from '../../utils'; const app = express(); app.set('x-powered-by', false); // Trust proxy for rate limiting - only trust localhost and private networks app.set('trust proxy', ['loopback', 'linklocal', 'uniquelocal']); // Serve favicon const faviconPath = path.join(process.cwd(), 'public', 'favicon.ico'); if (fs.existsSync(faviconPath)) { app.use(serveFavicon(faviconPath)); } // app.use(standardHeaders); app.use(cors()); // Security middlewares app.use(helmet()); app.use(rateLimitMiddleware); app.use(express.json({ limit: '1mb' })); // Add request ID middleware app.use(addRequestIdMiddleware); // Request logging middleware (includes session metrics) app.use(logRequest); let carbonVoiceApiHealth: ApiHealthStatus = { isHealthy: isTestEnvironment() ? true : false, // Assume healthy in test mode lastChecked: new Date().toISOString(), apiUrl: env.CARBON_VOICE_BASE_URL, }; app.get('/health', (req, res: Response) => { const response = { status: carbonVoiceApiHealth.isHealthy ? 'healthy' : 'degraded', timestamp: new Date().toISOString(), uptime: getProcessUptime(), dependencies: { carbonVoiceApi: { status: carbonVoiceApiHealth.isHealthy ? 'healthy' : 'unhealthy', lastChecked: carbonVoiceApiHealth.lastChecked, ...(carbonVoiceApiHealth.error && { error: carbonVoiceApiHealth.error, }), }, }, }; const logLevel = carbonVoiceApiHealth.isHealthy ? 'info' : 'warn'; logger[logLevel]('Health check', response); const statusCode = carbonVoiceApiHealth.isHealthy ? 200 : 503; res.status(statusCode).json(response); }); app.get('/info', (req, res: Response) => { logger.info('Info', { ENV_VARS: { CARBON_VOICE_BASE_URL: env.CARBON_VOICE_BASE_URL, LOG_LEVEL: env.LOG_LEVEL, PORT: env.PORT, ENVIRONMENT: env.ENVIRONMENT, LOG_TRANSPORT: env.LOG_TRANSPORT, }, }); res.status(200).send('OK'); }); // Protected resource metadata (OAuth 2.1 RFC9728 compliant) app.get( '/.well-known/oauth-protected-resource', wellKnownCorsHeaders, oauthProtectedResource, ); app.get( '/.well-known/oauth-authorization-server', wellKnownCorsHeaders, oauthAuthorizationServer, ); // Handle HEAD requests without auth app.head('/', (req, res) => { const mcpProtocolVersion = req.headers['mcp-protocol-version'] || LATEST_PROTOCOL_VERSION; res .status(200) .set('Content-Type', 'application/json') .set('MCP-Protocol-Version', mcpProtocolVersion) .end(); }); async function handleSessionRequest( req: AuthenticatedRequest, res: Response, session?: Session, ) { try { const sessionId = getOrCreateSessionId(req); if (!session) { logger.warn('Session not found for request', { sessionId, }); res.status(404).json({ jsonrpc: '2.0', error: { code: 404, message: 'Session not found. Please reinitialize.', }, id: null, }); return; } // Record tool call for metrics only when actually executing tools if (req.body?.method === 'tools/call') { sessionService.recordToolCall(sessionId); } await session.transport.handleRequest(req, res, req.body); if (req.method === 'DELETE') { sessionService.destroySession(sessionId); } } catch (err) { const sessionId = getOrCreateSessionId(req); // Record error in session metrics if (sessionId) { sessionService.recordError(sessionId); } logger.error('❌ Error in handleSessionRequest', { error: { message: err instanceof Error ? err.message : String(err), stack: err instanceof Error ? err.stack : undefined, }, method: req.method, url: req.url, sessionId, }); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: null, }); } } } app.post( '/', authErrorLogger, requireBearerAuth({ verifier: createOAuthTokenVerifier(), requiredScopes: REQUIRED_SCOPES, resourceMetadataUrl: 'mcp', // FIXME: Add dynamic resource! }), addMcpSessionId, async (req: AuthenticatedRequest, res: Response, next: NextFunction) => { try { const sessionId = getOrCreateSessionId(req); const session = sessionService.getSession(sessionId); // Reuse existing session if (session) { // Check if session is expired if (sessionService.isSessionExpired(sessionId)) { logger.warn('Session expired, destroying and creating new one', { sessionId, }); sessionService.destroySession(sessionId); } else { // Record interaction and reuse session // Don't record interaction for tool calls as they'll be recorded separately if (req.body?.method !== 'tools/call') { sessionService.recordInteraction(sessionId); } await handleSessionRequest(req, res, session); return; } } // Create New Session if (isInitializeRequest(req.body)) { const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => getOrCreateSessionId(req), onsessioninitialized: (sessionId: string) => { logger.info('✅ New session initialized'); try { sessionService.createSession(transport!, req, sessionId); } catch (error) { logger.error('❌ Failed to create session', { error: error instanceof Error ? error.message : String(error), }); // Close transport if session creation fails transport.close(); } }, enableJsonResponse: true, }); transport.onclose = () => { logger.debug('🔌 Transport closed'); if (transport.sessionId) { sessionService.destroySession(transport.sessionId); } }; // Add error handling for the transport transport.onerror = (error) => { logger.error('🔴 Transport error', { error: { message: error.message, name: error.name, }, }); // Record error in session metrics if (transport.sessionId) { sessionService.recordError(transport.sessionId); } }; await server.connect(transport); await transport.handleRequest(req, res, req.body); return; } // No session ID and no initialize request logger.warn('❌ Session ID not found', { sessionId, userId: req.auth?.extra?.user?.id, headers: req.headers, }); res.status(404).json({ jsonrpc: '2.0', error: { code: 404, message: 'Session ID not found. Please reinitialize.', }, id: null, }); } catch (error) { logger.error('❌ Error in POST handler', { error: { message: error instanceof Error ? error.message : String(error), stack: error instanceof Error ? error.stack : undefined, }, method: req.method, url: req.url, sessionId: getOrCreateSessionId(req), hasInitializeRequest: isInitializeRequest(req.body), userId: req.auth?.extra?.user?.id, }); next(error); } }, ); // GET/DELETE : server-to-client (SSE) and session termination app.get( '/', authErrorLogger, requireBearerAuth({ verifier: createOAuthTokenVerifier(), requiredScopes: REQUIRED_SCOPES, resourceMetadataUrl: 'mcp', // FIXME: Add dynamic resource! }), addMcpSessionId, handleSessionRequestGetDelete, ); app.delete( '/', authErrorLogger, requireBearerAuth({ verifier: createOAuthTokenVerifier(), requiredScopes: REQUIRED_SCOPES, resourceMetadataUrl: 'mcp', // FIXME: Add dynamic resource! }), addMcpSessionId, handleSessionRequestGetDelete, ); // GET/DELETE : server-to-client (SSE) and session termination async function handleSessionRequestGetDelete( req: AuthenticatedRequest, res: Response, ) { try { const sessionId = getOrCreateSessionId(req); const session = sessionService.getSession(sessionId); if (!session) { logger.warn('Invalid or missing session ID', { sessionId }); res.status(404).json({ jsonrpc: '2.0', error: { code: 404, message: 'Session not found. Please reinitialize.', }, id: null, }); return; } // Check if session is expired if (sessionService.isSessionExpired(sessionId)) { logger.warn('Session expired, destroying', { sessionId }); sessionService.destroySession(sessionId); res.status(404).json({ jsonrpc: '2.0', error: { code: 404, message: 'Session expired. Please reinitialize.', }, id: null, }); return; } // Record interaction for metrics // Don't record interaction for tool calls as they'll be recorded separately if (req.body?.method !== 'tools/call') { sessionService.recordInteraction(sessionId); } await session.transport.handleRequest(req, res, req.body); if (req.method === 'DELETE') { sessionService.destroySession(sessionId); } } catch (err) { const sessionId = getOrCreateSessionId(req); // Record error in session metrics if (sessionId) { sessionService.recordError(sessionId); } logger.error('❌ Error in handleSessionRequestGetDelete', { error: { message: err instanceof Error ? err.message : String(err), stack: err instanceof Error ? err.stack : undefined, }, method: req.method, url: req.url, sessionId, }); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: null, }); } } } let heartbeatInterval: NodeJS.Timeout | undefined; function shutdown() { logger.info('🛑 Shutting down HTTP MCP Server...'); // Clear the heartbeat interval if it exists if (heartbeatInterval) { clearInterval(heartbeatInterval); } // Clean up all sessions using the enhanced service const totalSessions = sessionService.getSessionCount(); logger.info('Cleaning up sessions before shutdown', { totalSessions }); sessionService.clearAllSessions(); process.exit(0); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown); process.once('SIGUSR2', () => { logger.info('SIGUSR2 received, shutting down...'); shutdown(); process.kill(process.pid, 'SIGUSR2'); }); // Only start the server and heartbeat when not in test environment if (!isTestEnvironment()) { // Start server const PORT = env.PORT || 3005; const serverInstance = app.listen(PORT, async () => { // Initialize Carbon Voice API health status const status = await getCarbonVoiceApiStatus(); carbonVoiceApiHealth = { ...status, lastChecked: new Date().toISOString(), }; // Initialize and start session cleanup service const sessionConfig = SessionConfig.fromEnv(); const sessionLogger = new SessionLogger(); const sessionCleanupService = new SessionCleanupService( sessionService, sessionConfig, sessionLogger, ); sessionCleanupService.start(); logger.info('🚀 HTTP MCP Server started', { port: PORT, carbonVoiceApiHealth, sessionConfig: { ttlMs: sessionConfig.ttlMs, maxSessions: sessionConfig.maxSessions, cleanupIntervalMs: sessionConfig.cleanupIntervalMs, }, }); }); serverInstance.on('error', (error) => { logger.error('❌ Error on MCP HTTP Server', { error }); }); // Add additional process-level monitoring process.on('uncaughtException', (error) => { logger.error('🚨 Uncaught Exception detected', { error: error.message, stack: error.stack, name: error.name, processId: process.pid, totalSessions: sessionService.getAllSessionIds().length, }); }); process.on('unhandledRejection', (reason, promise) => { logger.error('🚨 Unhandled Promise Rejection detected', { reason: reason instanceof Error ? reason.message : String(reason), stack: reason instanceof Error ? reason.stack : undefined, promise: promise.toString(), processId: process.pid, totalSessions: sessionService.getAllSessionIds().length, }); }); // Log every 30 seconds to show the server is alive heartbeatInterval = setInterval(async () => { const status = await getCarbonVoiceApiStatus(); // Update Carbon Voice API health cache carbonVoiceApiHealth = { isHealthy: status.isHealthy, apiUrl: status.apiUrl, lastChecked: new Date().toISOString(), ...(status.error && { error: status.error }), }; const icon = carbonVoiceApiHealth.isHealthy ? '🟢' : '🔴'; const logArgs = { totalSessions: sessionService.getAllSessionIds().length, isCarbonVoiceApiWorking: carbonVoiceApiHealth.isHealthy, uptime: getProcessUptime(), memoryUsage: process.memoryUsage(), carbonVoiceApiHealth, }; logger.info(`💓 Server heartbeat ${icon}`, logArgs); }, 30000); } export default app;

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PhononX/cv-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server