Skip to main content
Glama

Curupira

by drzln
server.ts.backup•25 kB
/** * Consolidated Curupira MCP Server with Official SDK Transports * Uses built-in SSEServerTransport and StreamableHTTPServerTransport */ import Fastify from 'fastify' import cors from '@fastify/cors' import rateLimit from '@fastify/rate-limit' import helmet from '@fastify/helmet' import websocket from '@fastify/websocket' import { Server } from '@modelcontextprotocol/sdk/server/index.js' import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js' import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js' import type { FastifyInstance } from 'fastify' import { randomUUID } from 'node:crypto' import { setupMCPHandlers } from './mcp/index.js' import { WebSocketTransport } from './transport/websocket.js' import { loadYamlConfig } from './config/yaml-loader.js' import { logger } from './config/logger.js' // Configuration interfaces remain the same export interface CurupiraWebSocketConfig { enabled?: boolean path?: string pingInterval?: number pongTimeout?: number enablePing?: boolean } export interface CurupiraHttpConfig { enabled?: boolean httpPath?: string ssePath?: string sseEnabled?: boolean timeout?: number keepAliveInterval?: number useModernTransport?: boolean // Use StreamableHTTP vs SSE } export interface ServerConfig { name?: string version?: string host?: string port?: number environment?: 'development' | 'staging' | 'production' logLevel?: 'trace' | 'debug' | 'info' | 'warn' | 'error' | 'fatal' healthCheck?: boolean healthCheckPath?: string healthCheckInterval?: number transports?: { websocket?: CurupiraWebSocketConfig http?: CurupiraHttpConfig } cors?: { origins?: string[] credentials?: boolean } rateLimit?: { max?: number window?: number } auth?: { enabled?: boolean jwtSecret?: string tokenExpiry?: string } chrome?: { enabled?: boolean serviceUrl?: string connectTimeout?: number pageTimeout?: number defaultViewport?: { width?: number height?: number } } } export interface ServerOptions { configPath?: string envConfig?: Partial<ServerConfig> } export class CurupiraServer { private fastify: FastifyInstance private config: ServerConfig private transports = new Map<string, SSEServerTransport | StreamableHTTPServerTransport>() private isRunning = false constructor(options: ServerOptions = {}) { // Load and merge configuration this.config = this.loadConfiguration(options) // Initialize Fastify with proper logging this.fastify = Fastify({ logger: logger as any, trustProxy: true, }) } private loadConfiguration(options: ServerOptions): ServerConfig { // Default configuration const defaults: ServerConfig = { name: 'curupira-mcp-server', version: '1.0.0', host: '0.0.0.0', port: 8080, environment: 'development', logLevel: 'info', healthCheck: true, healthCheckPath: '/health', healthCheckInterval: 30000, transports: { websocket: { enabled: true, path: '/mcp/ws', enablePing: true, pingInterval: 30000, pongTimeout: 5000, }, http: { enabled: false, httpPath: '/mcp/http', ssePath: '/mcp/sse', sseEnabled: false, keepAliveInterval: 30000, useModernTransport: true, // Default to modern transport }, }, cors: { origins: ['*'], credentials: true, }, rateLimit: { max: 100, window: 60000, }, auth: { enabled: false, }, } // Environment variable overrides const envConfig: Partial<ServerConfig> = { name: process.env.CURUPIRA_NAME, host: process.env.CURUPIRA_HOST || process.env.HOST, port: process.env.CURUPIRA_PORT ? parseInt(process.env.CURUPIRA_PORT) : process.env.PORT ? parseInt(process.env.PORT) : undefined, environment: (process.env.NODE_ENV || process.env.CURUPIRA_ENV) as any, logLevel: process.env.CURUPIRA_LOG_LEVEL as any, } // Parse CORS origins from environment if (process.env.CURUPIRA_CORS_ORIGINS) { envConfig.cors = { origins: process.env.CURUPIRA_CORS_ORIGINS.split(',').map(s => s.trim()), } } // Parse transport settings from environment const transports: any = {} if (process.env.CURUPIRA_TRANSPORT_WEBSOCKET !== undefined) { transports.websocket = { enabled: process.env.CURUPIRA_TRANSPORT_WEBSOCKET === 'true', } } if (process.env.CURUPIRA_TRANSPORT_HTTP === 'true' || process.env.CURUPIRA_TRANSPORT_SSE === 'true') { transports.http = { enabled: true, httpPath: process.env.CURUPIRA_HTTP_PATH || '/mcp/http', ssePath: process.env.CURUPIRA_SSE_PATH || '/mcp/sse', sseEnabled: process.env.CURUPIRA_TRANSPORT_SSE === 'true', useModernTransport: process.env.CURUPIRA_USE_MODERN_TRANSPORT !== 'false', } } if (Object.keys(transports).length > 0) { envConfig.transports = transports } // Start with defaults let config = { ...defaults } // Merge with YAML config if provided if (options.configPath) { try { const yamlConfig = loadYamlConfig(options.configPath) config = this.deepMerge(config, yamlConfig) } catch (error) { logger.warn({ error, configPath: options.configPath }, 'Failed to load YAML config, using defaults') } } // Apply environment variables config = this.deepMerge(config, envConfig) // Apply passed-in config if (options.envConfig) { config = this.deepMerge(config, options.envConfig) } return config } private deepMerge(target: any, source: any): any { const result = { ...target } for (const key in source) { if (source[key] !== undefined && source[key] !== null) { if (typeof source[key] === 'object' && !Array.isArray(source[key])) { result[key] = this.deepMerge(result[key] || {}, source[key]) } else { result[key] = source[key] } } } return result } async start(): Promise<void> { if (this.isRunning) { throw new Error('Server is already running') } try { // Register plugins await this.registerPlugins() // Setup routes this.setupRoutes() // Initialize Chrome if configured if (this.config.chrome?.enabled && this.config.chrome?.serviceUrl) { try { const { ChromeManager } = await import('./chrome/manager.js') const manager = ChromeManager.getInstance() // Parse serviceUrl to get host and port const url = new URL(this.config.chrome.serviceUrl) await manager.initialize({ host: url.hostname, port: parseInt(url.port) || (url.protocol === 'https:' ? 443 : 80), secure: url.protocol === 'https:', timeout: this.config.chrome.connectTimeout }) logger.info('Chrome integration initialized') } catch (error) { logger.error({ error }, 'Failed to initialize Chrome integration') // Continue without Chrome - not critical for server startup } } // Setup transports based on configuration await this.setupTransports() // Start server const address = await this.fastify.listen({ port: this.config.port!, host: this.config.host!, }) this.isRunning = true logger.info({ address, transports: this.getEnabledTransports(), config: { name: this.config.name, version: this.config.version, environment: this.config.environment, logLevel: this.config.logLevel, } }, 'Curupira MCP Server started') } catch (error) { logger.error({ error }, 'Failed to start server') throw error } } async stop(): Promise<void> { if (!this.isRunning) { return } try { // Close all transports for (const [sessionId, transport] of this.transports) { try { await transport.close() this.transports.delete(sessionId) } catch (error) { logger.error({ error, sessionId }, 'Error closing transport') } } // Disconnect Chrome if initialized if (this.config.chrome?.enabled) { try { const { ChromeManager } = await import('./chrome/manager.js') const manager = ChromeManager.getInstance() await manager.disconnect() logger.info('Chrome disconnected') } catch (error) { logger.error({ error }, 'Error disconnecting Chrome') } } await this.fastify.close() this.isRunning = false logger.info('Server stopped') } catch (error) { logger.error({ error }, 'Error stopping server') throw error } } private async registerPlugins(): Promise<void> { // CORS await this.fastify.register(cors, { origin: this.config.cors?.origins || ['*'], credentials: this.config.cors?.credentials !== false, exposedHeaders: ['Mcp-Session-Id'], // Important for Streamable HTTP }) // Security headers await this.fastify.register(helmet, { contentSecurityPolicy: false, // WebSocket compatibility }) // Rate limiting await this.fastify.register(rateLimit, { max: this.config.rateLimit?.max || 100, timeWindow: this.config.rateLimit?.window || 60000, }) // WebSocket support if enabled if (this.config.transports?.websocket?.enabled !== false) { await this.fastify.register(websocket, { options: { maxPayload: 1048576, // 1MB }, }) } } private setupRoutes(): void { // Health check endpoint this.fastify.get(this.config.healthCheckPath || '/health', async () => ({ status: 'ok', timestamp: new Date().toISOString(), version: this.config.version, name: this.config.name, environment: this.config.environment, transports: this.getEnabledTransports(), uptime: process.uptime(), })) // Readiness check this.fastify.get('/ready', async () => ({ ready: this.isRunning, timestamp: new Date().toISOString(), })) // Server info and capabilities this.fastify.get('/info', async () => ({ name: this.config.name, version: this.config.version, protocol: 'mcp', capabilities: { resources: true, tools: true, prompts: true, }, transports: this.getTransportInfo(), endpoints: this.getEndpointInfo(), })) // Simple tools endpoint for testing (bypasses MCP transport complexity) this.fastify.get('/tools', async () => { try { // Import and call the tools handler directly const { setupUnifiedToolHandlers } = await import('./mcp/tools/index.js') // Collect tools by mocking the server const tools: any[] = [] const mockServer = { setRequestHandler: (schema: any, handler: Function) => { if (schema.method === 'tools/list') { // Execute the handler to get tools list handler({ params: {} }).then((result: any) => { if (result?.tools) { tools.push(...result.tools) } }) } } } setupUnifiedToolHandlers(mockServer as any) // Wait a bit for the async handler to complete await new Promise(resolve => setTimeout(resolve, 100)) return { available: tools.length, tools: tools.map(tool => ({ name: tool.name, description: tool.description })) } } catch (error) { return { error: error instanceof Error ? error.message : 'Unknown error', available: 0, tools: [] } } }) } private async setupTransports(): Promise<void> { const transports = this.config.transports || {} // Setup WebSocket transport if (transports.websocket?.enabled !== false) { this.setupWebSocketTransport() logger.info({ path: transports.websocket?.path || '/mcp', enablePing: transports.websocket?.enablePing, pingInterval: transports.websocket?.pingInterval, }, 'WebSocket transport enabled') } // Setup HTTP/SSE transport if (transports.http?.enabled) { // TEMPORARY: Force modern transport for debugging logger.info({ transportConfig: transports.http, useModernTransport: transports.http.useModernTransport }, 'Transport configuration debug') if (true) { // Force modern transport await this.setupStreamableHttpTransport() logger.info({ httpPath: transports.http?.httpPath || '/mcp', }, 'Streamable HTTP transport enabled (FORCED)') } else { await this.setupSseTransport() logger.info({ ssePath: transports.http?.ssePath || '/mcp/sse', }, 'SSE transport enabled (deprecated)') } } // Ensure at least one transport is enabled const enabledTransports = this.getEnabledTransports() if (enabledTransports.length === 0) { logger.warn('No transports enabled, enabling WebSocket by default') this.setupWebSocketTransport() } } private setupWebSocketTransport(): void { const wsConfig = this.config.transports?.websocket || {} const path = wsConfig.path || '/mcp' this.fastify.register(async (fastify) => { fastify.get(path, { websocket: true }, (socket: any, request) => { logger.info({ ip: request.ip, userAgent: request.headers['user-agent'], }, 'WebSocket connection established') const transport = new WebSocketTransport(socket) // Create a new MCP server instance for this connection const server = new Server( { name: this.config.name || 'curupira-mcp-server', version: this.config.version || '1.0.0' }, { capabilities: { resources: { list: true, read: true }, tools: { list: true, call: true }, prompts: { list: true, get: true } } } ) // Setup MCP handlers setupMCPHandlers(server) // Connect MCP server to transport server.connect(transport).catch((error) => { logger.error({ error }, 'Failed to connect MCP server to WebSocket') if ('close' in socket && typeof socket.close === 'function') { socket.close() } }) socket.on('error', (error: Error) => { logger.error({ error }, 'WebSocket error') }) socket.on('close', () => { logger.info('WebSocket connection closed') }) }) }) } private async setupStreamableHttpTransport(): Promise<void> { const httpConfig = this.config.transports?.http || {} const httpPath = httpConfig.httpPath || '/mcp' // Handle GET requests for server capabilities this.fastify.get(httpPath, async (request, reply) => { logger.debug('Received GET request for server capabilities') return reply .code(200) .header('Content-Type', 'application/json') .send({ name: this.config.name || 'curupira-mcp-server', version: this.config.version || '1.0.0', protocol: 'mcp', protocolVersion: '2025-03-26', capabilities: { resources: { list: true, read: true }, tools: { list: true, call: true }, prompts: { list: true, get: true }, logging: {} }, endpoints: { http: httpPath, websocket: this.config.transports?.websocket?.path || '/mcp' } }) }) // Handle POST requests for MCP JSON-RPC messages this.fastify.post(httpPath, async (request, reply) => { logger.debug({ body: request.body, headers: request.headers }, 'Received MCP JSON-RPC request') try { const body = request.body as any // Validate JSON-RPC request if (!body || !body.jsonrpc || body.jsonrpc !== '2.0' || !body.method) { return reply.code(400).send({ jsonrpc: '2.0', error: { code: -32600, message: 'Invalid Request', data: 'Missing required JSON-RPC fields' }, id: body?.id || null }) } // Get or create session const sessionId = (request.headers['mcp-session-id'] as string) || randomUUID() let transport = this.transports.get(sessionId) if (!transport) { logger.info(`Creating new MCP session: ${sessionId}`) // Create new Streamable HTTP transport transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => sessionId }) // Store transport this.transports.set(sessionId, transport) // Set up cleanup transport.onclose = () => { logger.info(`Transport closed for session ${sessionId}`) this.transports.delete(sessionId) } // Create new MCP server for this connection const server = new Server( { name: this.config.name || 'curupira-mcp-server', version: this.config.version || '1.0.0' }, { capabilities: { resources: { list: true, read: true }, tools: { list: true, call: true }, prompts: { list: true, get: true }, logging: {} } } ) // Setup handlers and connect setupMCPHandlers(server) await server.connect(transport) logger.info(`MCP server connected for session: ${sessionId}`) } // Set session ID header in response reply.header('Mcp-Session-Id', sessionId) reply.header('Content-Type', 'application/json') // Handle the request with the transport if (transport instanceof StreamableHTTPServerTransport) { // For Streamable HTTP, we need to handle the request directly // since it expects Node.js raw request/response objects await transport.handleRequest(request.raw, reply.raw, body) } else { logger.error('Invalid transport type') return reply.code(500).send({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', data: 'Invalid transport type' }, id: body.id || null }) } } catch (error) { logger.error({ error }, 'Error handling MCP request') const body = request.body as any if (!reply.sent) { return reply.code(500).send({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', data: error instanceof Error ? error.message : 'Unknown error' }, id: body?.id || null }) } } }) } private async setupSseTransport(): Promise<void> { const httpConfig = this.config.transports?.http || {} const ssePath = httpConfig.ssePath || '/mcp/sse' // SSE endpoint for establishing the stream this.fastify.get(ssePath, async (request, reply) => { logger.info({ ip: request.ip, userAgent: request.headers['user-agent'], }, 'SSE connection request') try { // Create a new SSE transport const transport = new SSEServerTransport('/mcp/messages', reply.raw, { enableDnsRebindingProtection: false // Allow all origins for development }) // Store transport by session ID const sessionId = transport.sessionId this.transports.set(sessionId, transport) // Set up cleanup on close transport.onclose = () => { logger.info(`SSE transport closed for session ${sessionId}`) this.transports.delete(sessionId) } // Create a new MCP server instance for this connection const server = new Server( { name: this.config.name || 'curupira-mcp-server', version: this.config.version || '1.0.0' }, { capabilities: { resources: { list: true, read: true }, tools: { list: true, call: true }, prompts: { list: true, get: true } } } ) // Setup MCP handlers setupMCPHandlers(server) // Connect the transport await server.connect(transport) logger.info(`Established SSE stream with session ID: ${sessionId}`) } catch (error) { logger.error({ error }, 'Error establishing SSE stream') if (!reply.sent) { reply.code(500).send('Error establishing SSE stream') } } }) // Messages endpoint for receiving client JSON-RPC requests this.fastify.post('/mcp/messages', async (request, reply) => { logger.debug('Received POST to /mcp/messages') // Extract session ID from query parameter const sessionId = (request.query as any)?.sessionId if (!sessionId) { logger.error('No session ID provided') return reply.code(400).send({ error: 'Missing sessionId parameter' }) } const transport = this.transports.get(sessionId) if (!transport || !(transport instanceof SSEServerTransport)) { logger.error(`No SSE transport found for session ${sessionId}`) return reply.code(404).send({ error: 'Session not found' }) } try { // Handle the POST message await transport.handlePostMessage(request.raw, reply.raw, request.body) } catch (error) { logger.error({ error }, 'Error handling POST message') if (!reply.sent) { reply.code(500).send({ error: 'Error handling request' }) } } }) } private getEnabledTransports(): string[] { const enabled: string[] = [] const transports = this.config.transports || {} if (transports.websocket?.enabled !== false) { enabled.push('websocket') } if (transports.http?.enabled) { if (transports.http.useModernTransport) { enabled.push('streamableHttp') } else { enabled.push('sse') } } return enabled } private getTransportInfo(): Record<string, any> { const info: Record<string, any> = {} const transports = this.config.transports || {} if (transports.websocket?.enabled !== false) { info.websocket = { endpoint: transports.websocket?.path || '/mcp', description: 'WebSocket transport for Chrome extension', pingEnabled: transports.websocket?.enablePing, pingInterval: transports.websocket?.pingInterval, } } if (transports.http?.enabled) { if (transports.http.useModernTransport) { info.streamableHttp = { endpoint: transports.http?.httpPath || '/mcp', description: 'Streamable HTTP transport (modern)', protocolVersion: '2025-03-26' } } else { info.sse = { endpoint: transports.http?.ssePath || '/mcp/sse', messagesEndpoint: '/mcp/messages', description: 'SSE transport (deprecated)', protocolVersion: '2024-11-05' } } } return info } private getEndpointInfo(): Record<string, any> { const endpoints: Record<string, any> = { health: { path: this.config.healthCheckPath || '/health', method: 'GET', description: 'Health check endpoint', }, info: { path: '/info', method: 'GET', description: 'Server information and capabilities', }, ready: { path: '/ready', method: 'GET', description: 'Readiness check endpoint', }, } // Add transport endpoints const transportInfo = this.getTransportInfo() Object.entries(transportInfo).forEach(([name, info]) => { endpoints[name] = info }) return endpoints } // Expose configuration for external access get serverConfig(): ServerConfig { return { ...this.config } } // Check if server is running get running(): boolean { return this.isRunning } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/drzln/curupira'

If you have feedback or need assistance with the MCP directory API, please join our Discord server