Skip to main content
Glama
server.ts59.9 kB
/** * Tally MCP Server * * Main server class that extends the MCP SDK Server with SSE transport support * for managing Tally.so forms through natural language commands. */ import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { ListToolsRequestSchema, CallToolRequestSchema, ServerCapabilities as MCPServerCapabilities } from '@modelcontextprotocol/sdk/types.js'; import express, { Express, Request, Response } from 'express'; import * as os from 'os'; import { TemplateTool, WorkspaceManagementTool, FormCreationTool, FormModificationTool, FormPermissionManager, FormRetrievalTool, FormSharingTool, SubmissionAnalysisTool, DiagnosticTool, } from './tools'; import { TallyApiClientConfig } from './services/TallyApiClient'; import { EventEmitter } from 'events'; import { env } from './config/env'; import { ServerCapabilities, ClientCapabilities, NegotiatedCapabilities } from './types/capabilities'; import { Registry, Gauge, /* Counter, */ collectDefaultMetrics } from 'prom-client'; import { MonitoringService } from './types/monitoring'; import { MonitoringServiceImpl } from './services/MonitoringService'; import { Logger } from './utils/logger'; import { SentryService } from './services/SentryService'; import { TallyApiClient } from './services/TallyApiClient'; /** * Log levels enumeration */ export enum LogLevel { FATAL = 0, ERROR = 1, WARN = 2, INFO = 3, DEBUG = 4, TRACE = 5, } /** * Structured log entry interface */ export interface LogEntry { timestamp: string; level: LogLevel; levelName: string; component: string; message: string; requestId?: string; userId?: string; correlationId?: string; error?: { name: string; message: string; stack?: string; code?: string; }; context?: Record<string, any>; duration?: number; metadata?: Record<string, any>; } /** * Error category enumeration for classification */ export enum ErrorCategory { VALIDATION = 'validation', AUTHENTICATION = 'authentication', AUTHORIZATION = 'authorization', NETWORK = 'network', TIMEOUT = 'timeout', RATE_LIMIT = 'rate_limit', INTERNAL = 'internal', EXTERNAL = 'external', CONFIGURATION = 'configuration', RESOURCE = 'resource', } /** * Structured error interface for consistent error handling */ export interface StructuredError extends Error { category: ErrorCategory; code: string; statusCode: number; isOperational: boolean; context?: Record<string, any>; requestId?: string; correlationId?: string; } /** * Logger configuration interface */ export interface LoggerConfig { level: LogLevel; component: string; enableConsole: boolean; enableStructured: boolean; redactSensitive: boolean; sensitiveFields: string[]; } /** * Default logger configuration */ export const DEFAULT_LOGGER_CONFIG: LoggerConfig = { level: LogLevel.INFO, component: 'MCPServer', enableConsole: true, enableStructured: true, redactSensitive: true, sensitiveFields: ['password', 'token', 'key', 'secret', 'authorization', 'cookie'], }; /** * Configuration options for the MCP server */ export interface MCPServerConfig { /** Port number for the HTTP server */ port: number; /** Host address to bind to */ host: string; /** Enable/disable CORS */ cors: boolean; /** Request timeout in milliseconds */ requestTimeout: number; /** Maximum number of concurrent SSE connections */ maxConnections: number; /** Enable debug logging */ debug: boolean; /** Logger configuration */ logger?: Partial<LoggerConfig>; /** Server capabilities */ capabilities?: MCPServerCapabilities; /** Tools available on the server */ tools?: any; /** Tally API Key */ tallyApiKey?: string; /** Enable Real Tally API */ enableRealTallyAPI?: boolean; } /** * Default server configuration */ export const DEFAULT_CONFIG: MCPServerConfig = { port: 3000, host: '0.0.0.0', cors: true, requestTimeout: 30000, // 30 seconds maxConnections: 100, debug: false, logger: DEFAULT_LOGGER_CONFIG, }; /** * Server state enumeration */ export enum ServerState { STOPPED = 'stopped', STARTING = 'starting', RUNNING = 'running', STOPPING = 'stopping', ERROR = 'error', } /** * Health metrics interface for comprehensive monitoring */ export interface HealthMetrics { /** Server uptime in milliseconds */ uptime: number; /** Current server state */ status: ServerState; /** Number of active connections */ connections: number; /** Memory usage statistics */ memory: { used: number; total: number; percentage: number; heapUsed: number; heapTotal: number; external: number; rss: number; }; /** CPU usage information */ cpu: { loadAverage: number[]; usage: number; }; /** System information */ system: { platform: string; arch: string; nodeVersion: string; freeMemory: number; totalMemory: number; }; /** Request statistics */ requests: { total: number; rate: number; // requests per minute errors: number; errorRate: number; // errors per minute }; /** Health status assessment */ healthy: boolean; /** Timestamp of the health check */ timestamp: string; } /** * Health threshold configuration */ export interface HealthThresholds { /** Maximum memory usage percentage before unhealthy */ maxMemoryPercent: number; /** Maximum CPU load average before unhealthy */ maxLoadAverage: number; /** Maximum error rate before unhealthy */ maxErrorRate: number; /** Maximum number of connections before unhealthy */ maxConnections: number; } /** * Default health thresholds */ export const DEFAULT_HEALTH_THRESHOLDS: HealthThresholds = { maxMemoryPercent: 95, // More lenient for development environments maxLoadAverage: 20, // More lenient for development environments maxErrorRate: 50, // 50 errors per minute maxConnections: 90, // 90% of max connections }; /** * Server capabilities configuration */ export const SERVER_CAPABILITIES: MCPServerCapabilities = { tools: {}, resources: { subscribe: false, listChanged: false }, prompts: { listChanged: false, }, logging: {} }; /** * Validate and merge client capabilities with server capabilities */ function negotiateCapabilities(clientCapabilities: unknown): NegotiatedCapabilities { const clientCaps = (clientCapabilities || {}) as ClientCapabilities; const negotiated: NegotiatedCapabilities = { protocolVersion: '1.0', tools: { listChanged: !!(SERVER_CAPABILITIES.tools?.listChanged && clientCaps.tools?.listChanged), }, resources: { subscribe: !!(SERVER_CAPABILITIES.resources?.subscribe && clientCaps.resources?.subscribe), listChanged: !!(SERVER_CAPABILITIES.resources?.listChanged && clientCaps.resources?.listChanged), }, prompts: { listChanged: !!(SERVER_CAPABILITIES.prompts?.listChanged && clientCaps.prompts?.listChanged), }, logging: { // No logging capabilities to negotiate yet } }; return negotiated; } /** * Main MCP Server class that extends the MCP SDK Server * Provides SSE transport support and Express.js integration */ export class MCPServer extends Server { private config: MCPServerConfig; private app: Express; private server: any; private state: ServerState; private activeConnections: Set<Response>; private connectionCount: number; private emitter: EventEmitter; private signalHandlers: { [key: string]: (...args: any[]) => void } = {}; // Health monitoring properties private startTime: [number, number]; // process.hrtime() format private healthThresholds: HealthThresholds; private requestStats: { total: number; errors: number; recentRequests: number[]; recentErrors: number[]; lastMinuteIndex: number; }; // Logging infrastructure private loggerConfig: LoggerConfig; private correlationIds: Map<string, string> = new Map(); private errorMetrics: { byCategory: Map<ErrorCategory, number>; byCode: Map<string, number>; total: number; }; // Tool instances private tools?: { workspaceManagement: WorkspaceManagementTool; template: TemplateTool; form_creation: FormCreationTool; form_modification: FormModificationTool; form_retrieval: FormRetrievalTool; form_sharing: FormSharingTool; form_permissions: FormPermissionManager; submission_analysis: SubmissionAnalysisTool; diagnostic: DiagnosticTool; }; private metricsRegistry: Registry; private monitoringService: MonitoringService; /** * Create a new MCP Server instance * @param config Server configuration options */ constructor(config: Partial<MCPServerConfig> = {}) { // Merge default and provided configurations const fullConfig: MCPServerConfig = { ...DEFAULT_CONFIG, ...config, logger: { ...DEFAULT_LOGGER_CONFIG, ...config.logger }, capabilities: config.capabilities || SERVER_CAPABILITIES, }; // Initialize the underlying MCP SDK Server with capabilities super({ name: 'tally-mcp-server-test', version: '1.0.0', capabilities: fullConfig.capabilities, debug: fullConfig.debug, }); this.config = fullConfig; this.loggerConfig = fullConfig.logger as LoggerConfig; this.app = express(); this.state = ServerState.STOPPED; this.activeConnections = new Set(); this.connectionCount = 0; this.emitter = new EventEmitter(); this.startTime = process.hrtime(); this.healthThresholds = DEFAULT_HEALTH_THRESHOLDS; this.metricsRegistry = new Registry(); collectDefaultMetrics({ register: this.metricsRegistry }); this.requestStats = { total: 0, errors: 0, recentRequests: [], recentErrors: [], lastMinuteIndex: 0, }; this.errorMetrics = { byCategory: new Map(), byCode: new Map(), total: 0, }; const logger = new Logger(this.loggerConfig); this.monitoringService = new MonitoringServiceImpl( // This is a placeholder. In a real implementation, you would // pass a real Cloudflare Analytics Engine binding. { writeDataPoint: () => {} }, logger ); SentryService.initialize(); this.setupMiddleware(); this.setupRoutes(); this.setupSignalHandlers(); } /** * Get current server state */ public getState(): ServerState { return this.state; } /** * Get current number of active connections */ public getConnectionCount(): number { return this.connectionCount; } /** * Get server configuration */ public getConfig(): MCPServerConfig { return { ...this.config, logger: { ...this.loggerConfig } }; } /** * Get Express app instance for testing */ public getApp(): Express { return this.app; } /** * Get comprehensive health metrics */ public getHealthMetrics(): HealthMetrics { const uptime = this.getUptime(); const memoryUsage = process.memoryUsage(); const totalMemory = os.totalmem(); const freeMemory = os.freemem(); const usedMemory = totalMemory - freeMemory; const memoryPercentage = (usedMemory / totalMemory) * 100; const loadAverage = os.loadavg(); // Update request statistics for current minute this.updateRequestStats(); const metrics: HealthMetrics = { uptime, status: this.state, connections: this.connectionCount, memory: { used: usedMemory, total: totalMemory, percentage: memoryPercentage, heapUsed: memoryUsage.heapUsed, heapTotal: memoryUsage.heapTotal, external: memoryUsage.external, rss: memoryUsage.rss, }, cpu: { loadAverage, usage: loadAverage?.[0] ?? 0, // 1-minute load average }, system: { platform: os.platform(), arch: os.arch(), nodeVersion: process.version, freeMemory, totalMemory, }, requests: { total: this.requestStats.total, rate: this.getRequestRate(), errors: this.requestStats.errors, errorRate: this.getErrorRate(), }, healthy: this.isHealthy(), timestamp: new Date().toISOString(), }; return metrics; } /** * Check if server is healthy based on thresholds */ public isHealthy(): boolean { if (this.state !== ServerState.RUNNING) { return false; } const totalMemory = os.totalmem(); const memoryPercentage = ((totalMemory - os.freemem()) / totalMemory) * 100; // Check memory threshold if (memoryPercentage > this.healthThresholds.maxMemoryPercent) { return false; } // Check CPU load average const loadAverage = os.loadavg(); if (loadAverage?.[0] && loadAverage[0] > this.healthThresholds.maxLoadAverage) { return false; } // Check error rate if (this.getErrorRate() > this.healthThresholds.maxErrorRate) { return false; } // Check connection count const maxConnections = (this.config.maxConnections * this.healthThresholds.maxConnections) / 100; if (this.connectionCount > maxConnections) { return false; } return true; } /** * Get server uptime in milliseconds */ private getUptime(): number { const diff = process.hrtime(this.startTime); return Math.round((diff[0] * 1000) + (diff[1] / 1e6)); } /** * Get current request rate (requests per minute) */ private getRequestRate(): number { const sum = this.requestStats.recentRequests.reduce((a, b) => a + b, 0); return sum; } /** * Get current error rate (errors per minute) */ private getErrorRate(): number { const sum = this.requestStats.recentErrors.reduce((a, b) => a + b, 0); return sum; } /** * Update request statistics for the current minute */ private updateRequestStats(): void { const currentMinute = Math.floor(Date.now() / 60000); const arrayIndex = currentMinute % 60; if (arrayIndex !== this.requestStats.lastMinuteIndex) { // Reset counters for new minute this.requestStats.recentRequests[arrayIndex] = 0; this.requestStats.recentErrors[arrayIndex] = 0; this.requestStats.lastMinuteIndex = arrayIndex; } } /** * Increment request counter */ private incrementRequestCount(): void { this.requestStats.total++; this.updateRequestStats(); const currentMinute = Math.floor(Date.now() / 60000); const arrayIndex = currentMinute % 60; if (this.requestStats.recentRequests[arrayIndex] !== undefined) { this.requestStats.recentRequests[arrayIndex]++; } } /** * Increment error counter */ private incrementErrorCount(): void { this.requestStats.errors++; this.updateRequestStats(); const currentMinute = Math.floor(Date.now() / 60000); const arrayIndex = currentMinute % 60; if (this.requestStats.recentErrors[arrayIndex] !== undefined) { this.requestStats.recentErrors[arrayIndex]++; } } /** * Initialize the server * Sets up Express middleware, routes, and starts listening */ public async initialize(): Promise<void> { if (this.state !== ServerState.STOPPED) { this.log('warn', 'Server is not stopped, cannot initialize.'); return; } this.log('info', 'Initializing server...'); this.state = ServerState.STARTING; this.setupMiddleware(); this.setupRoutes(); this.setupMCPHandlers(); this.initializeTools(); try { await this.startHttpServer(); this.state = ServerState.RUNNING; this.log('info', `Server running at http://${this.config.host}:${this.config.port}`); this.emit('ready'); } catch (error) { this.log('fatal', 'Server failed to start', undefined, error as Error); this.state = ServerState.ERROR; throw error; } } /** * Shutdown the server gracefully * Closes all connections and releases resources */ public async shutdown(): Promise<void> { if (this.state === ServerState.STOPPED || this.state === ServerState.STOPPING) { this.log('warn', 'Server is already stopped or stopping'); return; } try { this.state = ServerState.STOPPING; this.log('info', 'Starting graceful server shutdown...'); // Remove signal handlers to prevent memory leaks this.removeSignalHandlers(); // Close all active SSE connections this.closeAllConnections(); // Stop the HTTP server await this.stopHttpServer(); this.state = ServerState.STOPPED; this.log('info', 'Server shutdown completed successfully'); } catch (error) { this.state = ServerState.ERROR; this.log('error', 'Error during server shutdown:', undefined, error as Error); throw error; } } /** * Handle new SSE connection * Establishes Server-Sent Events connection for MCP protocol communication */ private async handleSSEConnection(req: Request, res: Response): Promise<void> { const requestId = (req as any).requestId || 'unknown'; this.log('info', `New SSE connection established [${requestId}]`); if (res.destroyed || res.headersSent) { this.log('warn', `Attempted to setup SSE on closed connection [${requestId}]`); return; } res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control, Connection', }); this.activeConnections.add(res); this.connectionCount++; this.sendSSEMessage(res, 'connection', { status: 'connected', serverId: requestId, serverInfo: { name: 'Tally MCP Server', version: '1.0.0', capabilities: Object.keys(SERVER_CAPABILITIES), }, }); // Send initial tools list notification this.sendSSEMessage(res, 'mcp-response', { jsonrpc: '2.0', method: 'notifications/tools/list_changed', params: { tools: await this._handleToolsList() } }); const timeout = setTimeout(() => { this.log('debug', `SSE connection timeout [${requestId}]`); this.removeConnection(res); }, this.config.requestTimeout * 2); req.on('close', () => { clearTimeout(timeout); this.log('info', `SSE connection closed [${requestId}]`); this.removeConnection(res); }); req.on('aborted', () => { clearTimeout(timeout); this.log('info', `SSE connection aborted [${requestId}]`); this.removeConnection(res); }); res.on('error', (error) => { clearTimeout(timeout); this.log('error', `SSE connection error [${requestId}]:`, undefined, error); this.removeConnection(res); }); res.on('finish', () => { clearTimeout(timeout); this.log('debug', `SSE response finished [${requestId}]`); this.removeConnection(res); }); } /** * Remove connection from active connections set * Cleans up SSE connection and updates connection count */ private removeConnection(res: Response): void { if (this.activeConnections.has(res)) { this.activeConnections.delete(res); this.connectionCount--; this.log('debug', `Connection removed. Active connections: ${this.connectionCount}`); // Close the response if it's still open if (!res.destroyed) { res.end(); } } } /** * Send SSE message to a specific connection * Formats and sends Server-Sent Events message */ private sendSSEMessage(res: Response, event: string, data: any): void { if (res.destroyed || !this.activeConnections.has(res)) { return; } try { const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; res.write(message); } catch (error) { this.log('error', 'Error sending SSE message:', undefined, error as Error); this.removeConnection(res); } } /** * Broadcast message to all active SSE connections * Sends the same message to all connected clients */ private broadcastToConnections(event: string, data: any): void { this.log('debug', `Broadcasting to ${this.activeConnections.size} connections`); for (const connection of this.activeConnections) { this.sendSSEMessage(connection, event, data); } } /** * Enhanced structured logging method */ private log( level: LogLevel | 'info' | 'warn' | 'error' | 'debug' | 'trace' | 'fatal', message: string, context?: Record<string, any>, error?: Error | StructuredError, requestId?: string ): void { // Convert string level to enum let logLevel: LogLevel; if (typeof level === 'string') { logLevel = { 'fatal': LogLevel.FATAL, 'error': LogLevel.ERROR, 'warn': LogLevel.WARN, 'info': LogLevel.INFO, 'debug': LogLevel.DEBUG, 'trace': LogLevel.TRACE, }[level] ?? LogLevel.INFO; } else { logLevel = level; } // Skip if log level is below configured threshold if (logLevel > this.loggerConfig.level) return; // Skip debug logs unless debug mode is enabled (backward compatibility) if (logLevel === LogLevel.DEBUG && !this.config.debug) return; const entry: LogEntry = { timestamp: new Date().toISOString(), level: logLevel, levelName: LogLevel[logLevel], component: this.loggerConfig.component, message, }; if (requestId) { entry.requestId = requestId; } // Add correlation ID if available if (requestId && this.correlationIds.has(requestId)) { const correlationId = this.correlationIds.get(requestId); if (correlationId) { entry.correlationId = correlationId; } } // Add error information if provided if (error) { entry.error = { name: error.name, message: error.message, }; if (error.stack) { entry.error.stack = error.stack; } if ((error as any).code) { entry.error.code = (error as any).code; } } // Add context if provided (with sensitive data redaction) if (context) { entry.context = this.loggerConfig.redactSensitive ? this.redactSensitiveData(context) : context; } // Output structured log entry if (this.loggerConfig.enableStructured) { this.outputStructuredLog(entry); } // Output console log for backward compatibility if (this.loggerConfig.enableConsole) { this.outputConsoleLog(entry); } } /** * Output structured JSON log entry */ private outputStructuredLog(entry: LogEntry): void { console.log(JSON.stringify(entry)); } /** * Output human-readable console log entry */ private outputConsoleLog(entry: LogEntry): void { const prefix = `[${entry.timestamp}] [${entry.levelName}] [${entry.component}]`; const requestInfo = entry.requestId ? ` [${entry.requestId}]` : ''; const correlationInfo = entry.correlationId ? ` [corr:${entry.correlationId}]` : ''; let logMessage = `${prefix}${requestInfo}${correlationInfo} ${entry.message}`; if (entry.context) { logMessage += ` | Context: ${JSON.stringify(entry.context)}`; } if (entry.error) { logMessage += ` | Error: ${entry.error.name}: ${entry.error.message}`; if (entry.error.stack && entry.level <= LogLevel.ERROR) { console.log(logMessage); console.log(`Stack trace:\n${entry.error.stack}`); return; } } console.log(logMessage); } /** * Redact sensitive data from log context */ private redactSensitiveData(data: any): any { if (typeof data !== 'object' || data === null) { return data; } const redacted = Array.isArray(data) ? [...data] : { ...data }; for (const key in redacted) { if (this.loggerConfig.sensitiveFields.some(field => key.toLowerCase().includes(field.toLowerCase()) )) { redacted[key] = '[REDACTED]'; } else if (typeof redacted[key] === 'object') { redacted[key] = this.redactSensitiveData(redacted[key]); } } return redacted; } /** * Get error metrics for health monitoring integration */ public getErrorMetrics() { return { total: this.errorMetrics.total, byCategory: Object.fromEntries(this.errorMetrics.byCategory), byCode: Object.fromEntries(this.errorMetrics.byCode), }; } /** * Request ID generation middleware * Adds unique request ID for tracing */ private requestIdMiddleware(req: Request, _res: Response, next: Function): void { // Generate unique request ID (req as any).requestId = `req-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; next(); } /** * Request logging middleware * Logs incoming requests with method, URL, and request ID */ private requestLoggingMiddleware(req: Request, res: Response, next: Function): void { const requestId = (req as any).requestId || 'unknown'; this.log('debug', `Incoming request: ${req.method} ${req.url} [${requestId}]`); // Track request statistics this.incrementRequestCount(); // Track errors using res.on('finish') res.on('finish', () => { if (res.statusCode >= 400) { this.incrementErrorCount(); } }); next(); } /** * CORS middleware * Enhanced CORS configuration for SSE and MCP protocol support */ private corsMiddleware(_req: Request, res: Response, next: Function): void { res.header('Access-Control-Allow-Origin', '*'); res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization, Cache-Control, Connection'); res.header('Access-Control-Expose-Headers', 'Content-Type, Cache-Control, Connection'); next(); } /** * Connection limit middleware * Prevents server overload by limiting concurrent connections */ private connectionLimitMiddleware(_req: Request, res: Response, next: Function): void { if (this.connectionCount >= this.config.maxConnections) { this.log('warn', `Connection limit reached (${this.config.maxConnections}). Rejecting new connection.`); res.status(503).json({ error: 'Service Unavailable', message: 'Maximum connection limit reached. Please try again later.', }); return; } next(); } /** * Request timeout middleware * Sets timeout for individual requests */ private requestTimeoutMiddleware(req: Request, res: Response, next: Function): void { const timeout = setTimeout(() => { if (!res.headersSent) { this.log('warn', `Request timeout: ${req.method} ${req.url}`); res.status(408).json({ error: 'Request timeout', message: 'Request took too long to process', }); } }, this.config.requestTimeout); res.on('finish', () => { clearTimeout(timeout); }); next(); } /** * Set up Express middleware * Enhanced middleware stack with proper ordering and SSE support */ private setupMiddleware(): void { this.log('debug', 'Setting up Express middleware stack...'); // 1. Request ID generation middleware (for tracing) this.app.use(this.requestIdMiddleware.bind(this)); // 2. Request logging middleware this.app.use(this.requestLoggingMiddleware.bind(this)); // 3. CORS middleware (must be before other middleware) if (this.config.cors) { this.app.use(this.corsMiddleware.bind(this)); } // 4. Body parsing middleware this.app.use(express.json({ limit: '10mb' })); this.app.use(express.urlencoded({ extended: true, limit: '10mb' })); // 5. Connection limit middleware this.app.use(this.connectionLimitMiddleware.bind(this)); // 6. Request timeout middleware this.app.use(this.requestTimeoutMiddleware.bind(this)); this.log('debug', 'Express middleware setup completed'); } /** * Set up server routes * Enhanced with SSE endpoint and MCP protocol support */ private setupRoutes(): void { this.log('debug', 'Setting up server routes...'); // Basic info endpoint (backward compatibility) this.app.get('/', (_req, res) => { res.json({ name: 'Tally MCP Server', version: '1.0.0', status: this.state, connections: this.connectionCount, }); }); // Comprehensive health check endpoint this.app.get('/health', (req: Request, res: Response) => { const start = process.hrtime(); const healthy = this.isHealthy(); const status = healthy ? 200 : 503; const responseBody = { status: healthy ? 'ok' : 'unhealthy', timestamp: new Date().toISOString(), uptime: this.getUptime(), }; res.status(status).json(responseBody); const duration = process.hrtime(start); const durationMs = duration[0] * 1000 + duration[1] / 1e6; this.monitoringService.trackRequest(req.method, req.path, status, durationMs); }); // SSE endpoint for MCP protocol communication this.app.get('/sse', (req, res) => { this.handleSSEConnection(req, res); }); // MCP protocol message endpoint (POST) this.app.post('/message', async (req, res) => { await this.handleMCPMessage(req.body, res); }); this.app.get('/metrics', async (req: Request, res: Response) => { const start = process.hrtime(); try { this.updateMetricsFromHealth(); res.set('Content-Type', this.metricsRegistry.contentType); res.end(await this.metricsRegistry.metrics()); const duration = process.hrtime(start); const durationMs = duration[0] * 1000 + duration[1] / 1e6; this.monitoringService.trackRequest(req.method, req.path, 200, durationMs); } catch (error) { this.log('error', 'Failed to generate metrics', { error }, error instanceof Error ? error : new Error(String(error))); res.status(500).send('Failed to generate metrics'); const duration = process.hrtime(start); const durationMs = duration[0] * 1000 + duration[1] / 1e6; this.monitoringService.trackRequest(req.method, req.path, 500, durationMs, undefined, 'Failed to generate metrics'); } }); // Default route for 404 this.app.use((_req: Request, res: Response) => { res.status(404).json({ error: 'Not Found' }); }); this.log('debug', 'Server routes setup completed'); } /** * Start the HTTP server */ private async startHttpServer(): Promise<void> { return new Promise((resolve, reject) => { try { let resolved = false; const handleResolve = () => { if (!resolved) { resolved = true; clearTimeout(startupTimeout); resolve(); } }; const handleReject = (error: any) => { if (!resolved) { resolved = true; clearTimeout(startupTimeout); this.state = ServerState.ERROR; reject(error); } }; // Set up error handler BEFORE calling listen this.server = this.app.listen(this.config.port, this.config.host); this.server.on('error', (error: any) => { this.log('error', 'HTTP server error:', undefined, error as Error); // Handle specific error types if (error.code === 'EADDRINUSE') { const portError = new Error(`Port ${this.config.port} is already in use`); handleReject(portError); } else if (error.code === 'EACCES') { const permError = new Error(`Permission denied to bind to port ${this.config.port}`); handleReject(permError); } else { handleReject(error); } }); this.server.on('listening', () => { this.log('info', `HTTP server listening on ${this.config.host}:${this.config.port}`); handleResolve(); }); // Set server timeout this.server.timeout = this.config.requestTimeout; // Additional timeout to ensure we don't hang indefinitely const startupTimeout = setTimeout(() => { if (!resolved) { handleReject(new Error(`Server failed to start within timeout`)); } }, 5000); } catch (error) { this.state = ServerState.ERROR; reject(error); } }); } /** * Stop the HTTP server */ private async stopHttpServer(): Promise<void> { if (!this.server) { this.log('debug', 'No HTTP server to stop'); return; } return new Promise((resolve) => { // Track if we've already resolved to prevent double resolution let resolved = false; const doResolve = () => { if (!resolved) { resolved = true; this.server = null; resolve(); } }; // Immediately stop accepting new connections this.server.close((error: any) => { if (error) { this.log('error', 'Error stopping HTTP server:', undefined, error as Error); } else { this.log('info', 'HTTP server stopped successfully'); } doResolve(); }); // Force close existing connections if (this.server.listening) { this.server.getConnections((err: any, count: number) => { if (!err && count > 0) { this.log('debug', `Forcibly closing ${count} remaining connections`); // Destroy all connections immediately this.server.closeAllConnections?.(); // Also manually destroy sockets if available if (this.server._sockets) { for (const socket of this.server._sockets) { if (socket && !socket.destroyed) { socket.destroy(); } } } } }); } // Force close after timeout setTimeout(() => { if (!resolved) { this.log('warn', 'Forcing server close due to timeout'); doResolve(); } }, 100); // Very short timeout for tests }); } /** * Set up signal handlers for graceful shutdown */ private setupSignalHandlers(): void { this.log('debug', 'Setting up signal handlers...'); const handleShutdown = (signal: string) => { this.log('info', `Received ${signal}, initiating graceful shutdown...`); this.shutdown().catch((error) => { this.log('error', 'Error during graceful shutdown:', undefined, error as Error); process.exit(1); }); }; // Store handlers so we can remove them later this.signalHandlers['SIGTERM'] = () => handleShutdown('SIGTERM'); this.signalHandlers['SIGINT'] = () => handleShutdown('SIGINT'); process.on('SIGTERM', this.signalHandlers['SIGTERM']); process.on('SIGINT', this.signalHandlers['SIGINT']); this.log('debug', 'Signal handlers setup completed'); } /** * Remove signal handlers to prevent memory leaks */ private removeSignalHandlers(): void { this.log('debug', 'Removing signal handlers...'); for (const signal in this.signalHandlers) { const handler = this.signalHandlers[signal]; if (handler) { process.removeListener(signal, handler); } } this.signalHandlers = {}; this.log('debug', 'Signal handlers removed'); } /** * Close all active SSE connections */ private closeAllConnections(): void { this.log('info', 'Closing all active connections...'); for (const connection of this.activeConnections) { this.removeConnection(connection); } } /** * Initialize all Tally tools */ private initializeTools(): void { this.log('info', 'Initializing tools...'); const apiClientConfig: TallyApiClientConfig = { accessToken: env.TALLY_API_KEY }; const tallyApiClient = new TallyApiClient(apiClientConfig); this.tools = { workspaceManagement: new WorkspaceManagementTool(apiClientConfig), template: new TemplateTool(), form_creation: new FormCreationTool(apiClientConfig), form_modification: new FormModificationTool(apiClientConfig), form_retrieval: new FormRetrievalTool(apiClientConfig), form_sharing: new FormSharingTool(tallyApiClient), form_permissions: new FormPermissionManager(apiClientConfig), submission_analysis: new SubmissionAnalysisTool(apiClientConfig), diagnostic: new DiagnosticTool(), }; this.log('info', 'Tools initialized.'); } /** * Extract tools list logic for reuse */ private async _handleToolsList(): Promise<{ tools: any[] }> { return { tools: [ { name: 'create_form', description: 'Create a new Tally form with specified fields and configuration. This tool converts simple field definitions into Tally\'s complex blocks-based structure automatically. The form status defaults to DRAFT if not specified.', inputSchema: { type: 'object', properties: { title: { type: 'string', description: 'Form title (required) - will be displayed as the main form heading', minLength: 1, maxLength: 100 }, description: { type: 'string', description: 'Optional form description - displayed below the title to provide context' }, status: { type: 'string', enum: ['DRAFT', 'PUBLISHED'], description: 'Form publication status. Use DRAFT for unpublished forms that are being worked on, or PUBLISHED for live forms. Defaults to DRAFT if not specified.', default: 'DRAFT' }, fields: { type: 'array', description: 'Array of form fields/questions. Each field will be converted to appropriate Tally blocks automatically.', minItems: 1, items: { type: 'object', properties: { type: { type: 'string', enum: ['text', 'email', 'number', 'textarea', 'select', 'checkbox', 'radio'], description: 'Field input type. Maps to Tally blocks: text→INPUT_TEXT, email→INPUT_EMAIL, number→INPUT_NUMBER, textarea→TEXTAREA, select→DROPDOWN, checkbox→CHECKBOXES, radio→MULTIPLE_CHOICE' }, label: { type: 'string', description: 'Field label/question text - what the user will see', minLength: 1 }, required: { type: 'boolean', description: 'Whether this field must be filled out before form submission', default: false }, options: { type: 'array', items: { type: 'string' }, description: 'Available options for select, checkbox, or radio field types. Required for select/checkbox/radio fields.' } }, required: ['type', 'label'], additionalProperties: false } } }, required: ['title', 'fields'], additionalProperties: false, examples: [ { title: "Customer Feedback Survey", description: "Help us improve our service", status: "DRAFT", fields: [ { type: "text", label: "What is your name?", required: true }, { type: "email", label: "Email address", required: true }, { type: "select", label: "How would you rate our service?", required: false, options: ["Excellent", "Good", "Fair", "Poor"] } ] } ] } }, { name: 'modify_form', description: 'Modify an existing Tally form', inputSchema: { type: 'object', properties: { formId: { type: 'string', description: 'ID of the form to modify' }, title: { type: 'string', description: 'New form title' }, description: { type: 'string', description: 'New form description' }, fields: { type: 'array', items: { type: 'object', properties: { type: { type: 'string', enum: ['text', 'email', 'number', 'textarea', 'select', 'checkbox', 'radio'] }, label: { type: 'string' }, required: { type: 'boolean' }, options: { type: 'array', items: { type: 'string' } } }, required: ['type', 'label'] } } }, required: ['formId'] } }, { name: 'get_form', description: 'Retrieve information about a specific Tally form', inputSchema: { type: 'object', properties: { formId: { type: 'string', description: 'ID of the form to retrieve' } }, required: ['formId'] } }, { name: 'list_forms', description: 'List all forms in the authenticated user\'s Tally account', inputSchema: { type: 'object', properties: { limit: { type: 'number', description: 'Maximum number of forms to return', minimum: 1, maximum: 100 }, offset: { type: 'number', description: 'Number of forms to skip for pagination', minimum: 0 } } } }, { name: 'delete_form', description: 'Delete a Tally form permanently', inputSchema: { type: 'object', properties: { formId: { type: 'string', description: 'ID of the form to delete' } }, required: ['formId'] } }, { name: 'get_submissions', description: 'Retrieve submissions for a specific Tally form', inputSchema: { type: 'object', properties: { formId: { type: 'string', description: 'ID of the form to get submissions for' }, limit: { type: 'number', description: 'Maximum number of submissions to return', minimum: 1, maximum: 100 }, offset: { type: 'number', description: 'Number of submissions to skip for pagination', minimum: 0 }, since: { type: 'string', description: 'Only return submissions created after this ISO 8601 timestamp' } }, required: ['formId'] } }, { name: 'analyze_submissions', description: 'Analyze submission data for a Tally form to provide insights and statistics', inputSchema: { type: 'object', properties: { formId: { type: 'string', description: 'ID of the form to analyze submissions for' }, analysisType: { type: 'string', enum: ['basic_stats', 'response_patterns', 'completion_rates', 'field_analysis'], description: 'Type of analysis to perform' }, dateRange: { type: 'object', properties: { start: { type: 'string', description: 'Start date for analysis (ISO 8601)' }, end: { type: 'string', description: 'End date for analysis (ISO 8601)' } } } }, required: ['formId', 'analysisType'] } }, { name: 'export_submissions', description: 'Export form submissions in various formats', inputSchema: { type: 'object', properties: { formId: { type: 'string', description: 'ID of the form to export submissions for' }, format: { type: 'string', enum: ['csv', 'json', 'xlsx'], description: 'Export format', default: 'csv' }, dateRange: { type: 'object', properties: { start: { type: 'string', description: 'Start date for export (ISO 8601)' }, end: { type: 'string', description: 'End date for export (ISO 8601)' } } }, includeFields: { type: 'array', items: { type: 'string' }, description: 'Specific field IDs to include in export (all fields if not specified)' } }, required: ['formId'] } }, { name: 'manage_workspace', description: 'Manage workspace settings and information', inputSchema: { type: 'object', properties: { action: { type: 'string', enum: ['get_info', 'update_settings', 'list_members'], description: 'Action to perform on the workspace' }, settings: { type: 'object', properties: { name: { type: 'string', description: 'Workspace name' }, description: { type: 'string', description: 'Workspace description' } } } }, required: ['action'] } }, { name: 'generate_template', description: 'Generate a form template based on requirements or use case', inputSchema: { type: 'object', properties: { templateType: { type: 'string', enum: ['contact', 'survey', 'registration', 'feedback', 'order', 'application', 'custom'], description: 'Type of template to generate' }, requirements: { type: 'string', description: 'Specific requirements or use case description for custom templates' }, fields: { type: 'array', items: { type: 'string' }, description: 'Specific fields to include in the template' } }, required: ['templateType'] } }, { name: 'submission_analysis', description: 'Analyze form submissions, including completion rates and response distributions', inputSchema: { type: 'object', properties: { formId: { type: 'string' }, filters: { type: 'object', properties: { startDate: { type: 'string', format: 'date-time' }, endDate: { type: 'string', format: 'date-time' }, status: { type: 'string', enum: ['completed', 'incomplete', 'all'] }, } } }, required: ['formId'] }, outputSchema: { // ... } }, { name: 'diagnostic_tool', description: 'Runs diagnostic checks on the application.', inputSchema: { type: 'object', properties: { toolName: { type: 'string', description: 'The name of the specific diagnostic tool to run.' }, }, }, outputSchema: { // Can be an array of reports or a single report } } ] }; } /** * Extract tools call logic for reuse */ private async _handleToolsCall(message: any): Promise<{ content: any[], isError?: boolean }> { const { name, arguments: args } = message.params; try { if (!this.tools) { throw new Error('Tools not initialized'); } switch (name) { case 'list_forms': if (this.tools?.form_retrieval) { const forms = await this.tools.form_retrieval.execute(args || {}); return { content: [ { type: 'text', text: JSON.stringify(forms, null, 2) } ] }; } return { content: [ { type: 'text', text: 'Form retrieval functionality is not implemented' } ] }; case 'manage_workspace': if (args && args.action === 'get_info') { const workspaceInfo = await this.tools.workspaceManagement.listWorkspaces(); return { content: [ { type: 'text', text: JSON.stringify(workspaceInfo, null, 2) } ] }; } return { content: [ { type: 'text', text: 'Workspace management functionality is being implemented' } ] }; case 'submission_analysis': if (this.tools?.submission_analysis) { const result = await this.tools.submission_analysis.analyze(args.formId, args.filters); return { content: [ { type: 'text', text: JSON.stringify(result, null, 2) } ] }; } return { content: [ { type: 'text', text: 'Submission analysis functionality is not implemented' } ] }; case 'diagnostic_tool': if (this.tools?.diagnostic) { const result = await this.tools.diagnostic.execute(args.toolName); return { content: [ { type: 'text', text: JSON.stringify(result, null, 2) } ] }; } return { content: [ { type: 'text', text: 'Diagnostic tool functionality is not implemented' } ] }; default: return { content: [ { type: 'text', text: `Tool ${name} is not yet implemented` } ] }; } } catch (error) { return { content: [ { type: 'text', text: `Error executing tool ${name}: ${error instanceof Error ? error.message : String(error)}` } ], isError: true }; } } /** * Setup MCP tool request handlers */ private setupMCPHandlers(): void { // Only set up tool handlers if tools are configured if (this.tools || this.config.tools) { // Explicitly register capabilities to ensure the SDK knows about tools support this.registerCapabilities({ tools: {}, }); // Handle list tools requests this.setRequestHandler(ListToolsRequestSchema, async () => { return this._handleToolsList(); }); // Handle call tool requests this.setRequestHandler(CallToolRequestSchema, async (request) => { return this._handleToolsCall(request); }); } } public on(event: string, listener: (...args: any[]) => void): this { this.emitter.on(event, listener); return this; } public emit(event: string, ...args: any[]): boolean { return this.emitter.emit(event, ...args); } public broadcast(event: string, data: any): void { this.broadcastToConnections(event, data); } private updateMetricsFromHealth(): void { const metrics = this.getHealthMetrics(); let uptimeGauge = this.metricsRegistry.getSingleMetric('process_uptime_seconds') as Gauge; if (!uptimeGauge) { uptimeGauge = new Gauge({ name: 'process_uptime_seconds', help: 'Server uptime in seconds', registers: [this.metricsRegistry] }); } uptimeGauge.set(metrics.uptime); let connectionsGauge = this.metricsRegistry.getSingleMetric('active_connections') as Gauge; if (!connectionsGauge) { connectionsGauge = new Gauge({ name: 'active_connections', help: 'Number of active connections', registers: [this.metricsRegistry] }); } connectionsGauge.set(metrics.connections); let memoryUsedGauge = this.metricsRegistry.getSingleMetric('memory_used_bytes') as Gauge; if (!memoryUsedGauge) { memoryUsedGauge = new Gauge({ name: 'memory_used_bytes', help: 'Memory usage in bytes', registers: [this.metricsRegistry] }); } memoryUsedGauge.set(metrics.memory.used); let requestsTotalGauge = this.metricsRegistry.getSingleMetric('http_requests_total') as Gauge; if (!requestsTotalGauge) { requestsTotalGauge = new Gauge({ name: 'http_requests_total', help: 'Total number of HTTP requests', registers: [this.metricsRegistry] }); } requestsTotalGauge.set(metrics.requests.total); let errorsTotalGauge = this.metricsRegistry.getSingleMetric('http_requests_errors_total') as Gauge; if (!errorsTotalGauge) { errorsTotalGauge = new Gauge({ name: 'http_requests_errors_total', help: 'Total number of HTTP request errors', registers: [this.metricsRegistry] }); } errorsTotalGauge.set(metrics.requests.errors); } /** * Handle MCP protocol messages * Processes incoming MCP protocol messages using proper MCP SDK request handling */ private async handleMCPMessage(message: any, res: Response): Promise<void> { this.log('debug', 'Received MCP message:', { message, messageType: typeof message }); try { // Validate message structure if (message === null || message === undefined || typeof message !== 'object' || Array.isArray(message) || Object.keys(message).length === 0) { res.status(400).json({ jsonrpc: '2.0', id: message?.id || null, error: { code: -32600, message: 'Invalid Request', data: 'Message must be a valid non-empty object' } }); return; } // Validate JSON-RPC structure if (message.jsonrpc !== '2.0' || !message.method || typeof message.method !== 'string') { res.status(400).json({ jsonrpc: '2.0', id: message?.id || null, error: { code: -32600, message: 'Invalid Request', data: 'Request must be valid JSON-RPC 2.0 with method field' } }); return; } let mcpResponse; try { switch (message.method) { case 'initialize': // Validate protocol version if (!message.params?.protocolVersion) { mcpResponse = { jsonrpc: '2.0', id: message.id, error: { code: -32602, message: 'Invalid params', data: 'Protocol version is required' } }; break; } if (message.params.protocolVersion !== '2024-11-05') { mcpResponse = { jsonrpc: '2.0', id: message.id, error: { code: -32600, message: 'Invalid Request', data: 'Unsupported protocol version' } }; break; } try { const capabilities = negotiateCapabilities(message.params.capabilities); mcpResponse = { jsonrpc: '2.0', id: message.id, result: { protocolVersion: '2024-11-05', capabilities, serverInfo: { name: 'tally-mcp-server', version: '1.0.0', description: 'MCP server for Tally.so form management and automation' } } }; } catch (error) { mcpResponse = { jsonrpc: '2.0', id: message.id, error: { code: -32602, message: 'Invalid params', data: error instanceof Error ? error.message : 'Invalid capabilities' } }; } break; case 'tools/list': // Use the registered handler const toolsResult = await this._handleToolsList(); mcpResponse = { jsonrpc: '2.0', id: message.id, result: toolsResult }; break; case 'tools/call': // Use the registered handler const callResult = await this._handleToolsCall(message); mcpResponse = { jsonrpc: '2.0', id: message.id, result: callResult }; break; case 'resources/list': mcpResponse = { jsonrpc: '2.0', id: message.id, result: { resources: [] } }; break; case 'prompts/list': mcpResponse = { jsonrpc: '2.0', id: message.id, result: { prompts: [] } }; break; default: mcpResponse = { jsonrpc: '2.0', id: message.id, error: { code: -32601, message: 'Method not found', data: `Unknown method: ${message.method}` } }; } } catch (error) { this.log('error', 'Error processing MCP request:', undefined, error as Error); mcpResponse = { jsonrpc: '2.0', id: message.id, error: { code: -32603, message: 'Internal error', data: error instanceof Error ? error.message : 'Unknown error' } }; } this.broadcastToConnections('message', mcpResponse); res.json(mcpResponse); } catch (error) { this.log('error', 'Error processing MCP message:', undefined, error as Error); res.status(500).json({ jsonrpc: '2.0', id: message?.id || null, error: { code: -32603, message: 'Internal error', data: error instanceof Error ? error.message : 'Unknown error' } }); } } }

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/learnwithcc/tally-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server