Skip to main content
Glama

Curupira

by drzln
transport.ts8.2 kB
/** * Transport layer abstraction * * Supports multiple transport mechanisms for MCP */ import { Server } from '@modelcontextprotocol/sdk/server/index.js' import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js' import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js' import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js' import Fastify, { FastifyRequest, FastifyReply } from 'fastify' import fastifyCors from '@fastify/cors' import type { ILogger } from '../core/interfaces/logger.interface.js' import type { HealthChecker } from './health.js' import type { SecurityManager } from '../security/index.js' export type TransportType = 'stdio' | 'http' | 'sse' export interface TransportOptions { type: TransportType port?: number corsOrigins?: string[] enableSSE?: boolean enableWS?: boolean healthChecker?: HealthChecker securityManager?: SecurityManager } export class TransportManager { private server: Server private options: TransportOptions private httpServer?: ReturnType<typeof Fastify> private logger: ILogger constructor(server: Server, options: TransportOptions, logger: ILogger) { this.server = server this.options = options this.logger = logger } /** * Start the appropriate transport */ async start() { switch (this.options.type) { case 'stdio': return this.startStdio() case 'http': case 'sse': return this.startHttp() default: throw new Error(`Unknown transport type: ${this.options.type}`) } } /** * Start stdio transport */ private async startStdio() { this.logger.info('Starting stdio transport') const transport = new StdioServerTransport() await this.server.connect(transport) return 'stdio://connected' } /** * Start HTTP/SSE transport */ private async startHttp() { const port = this.options.port || 3000 this.logger.info({ port, type: this.options.type }, 'Starting HTTP transport') this.httpServer = Fastify({ logger: false, // We use our own logger }) // Register CORS await this.httpServer.register(fastifyCors, { origin: this.options.corsOrigins || true, credentials: true, }) // Apply security if configured if (this.options.securityManager) { await this.options.securityManager.applyToFastify(this.httpServer) } // Health check endpoint this.httpServer.get('/health', async (request: FastifyRequest, reply: FastifyReply) => { if (this.options.healthChecker) { const health = await this.options.healthChecker.check() const statusCode = health.status === 'healthy' ? 200 : health.status === 'degraded' ? 206 : 503 reply.code(statusCode).send(health) } else { reply.send({ status: 'healthy', timestamp: new Date().toISOString() }) } }) // Metrics endpoint this.httpServer.get('/metrics', async (request: FastifyRequest, reply: FastifyReply) => { if (this.options.healthChecker) { const metrics = this.options.healthChecker.getMetrics() // Prometheus format const output = Object.entries(metrics) .map(([key, value]) => `curupira_${key} ${value}`) .join('\n') reply.type('text/plain').send(output) } else { reply.code(404).send({ error: 'Metrics not available' }) } }) // MCP endpoint if (this.options.type === 'sse') { this.httpServer.get('/mcp', async (request: FastifyRequest, reply: FastifyReply) => { this.logger.info({ ip: request.ip }, 'SSE connection established') // Set SSE headers reply.raw.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', }) const transport = new SSEServerTransport('/mcp', reply.raw) await this.server.connect(transport) }) } else if (this.options.type === 'http') { // HTTP Transport using StreamableHTTPServerTransport with session management const transports = new Map<string, StreamableHTTPServerTransport>() this.httpServer.post('/mcp', async (request: FastifyRequest, reply: FastifyReply) => { this.logger.info({ method: 'POST', path: '/mcp' }, 'HTTP request for MCP') try { // Get or create session - use fixed session ID if none provided const sessionId = (request.headers['mcp-session-id'] as string) || 'default-session' let transport = transports.get(sessionId) if (!transport) { this.logger.info({ sessionId }, 'Creating new MCP transport session') transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => sessionId }) transports.set(sessionId, transport) // Connect to MCP server for this session await this.server.connect(transport) this.logger.info({ sessionId }, 'MCP transport session connected') // Clean up on close transport.onclose = () => { this.logger.info({ sessionId }, 'Transport session closed') transports.delete(sessionId) } } // Handle request with transport await transport.handleRequest(request.raw, reply.raw, request.body) } catch (error) { this.logger.error({ error }, 'Failed to handle HTTP transport') reply.code(500).send({ error: 'Internal server error' }) } }) // Also support SSE endpoint for HTTP transport this.httpServer.get('/mcp', async (request: FastifyRequest, reply: FastifyReply) => { reply.send({ message: 'HTTP transport active', endpoint: 'POST /mcp', headers: { 'Content-Type': 'application/json', 'MCP-Session-Id': 'optional session identifier' } }) }) } // Documentation endpoint this.httpServer.get('/docs', async (request: FastifyRequest, reply: FastifyReply) => { reply.send({ name: 'curupira-debug', version: '1.0.0', transport: this.options.type, endpoints: { health: '/health', metrics: '/metrics', mcp: this.options.type === 'sse' ? '/mcp' : undefined, }, capabilities: { resources: ['list', 'read'], tools: ['list', 'call'], prompts: ['list', 'get'], }, }) }) // Start server try { await this.httpServer.listen({ port, host: '0.0.0.0' }) this.logger.info({ port }, 'HTTP server started') return `http://0.0.0.0:${port}` } catch (error) { this.logger.error({ error }, 'Failed to start HTTP server') throw error } } /** * Handle HTTP message */ private async handleHttpMessage(message: unknown) { // This would integrate with the MCP server to handle HTTP POST messages // For now, return a placeholder const messageWithId = message as { id?: unknown } return { jsonrpc: '2.0', id: messageWithId.id, result: { error: 'HTTP POST not fully implemented', }, } } /** * Stop the transport */ async stop() { this.logger.info('Stopping transport') if (this.httpServer) { try { await this.httpServer.close() this.logger.info('HTTP server stopped') } catch (error) { this.logger.error({ error }, 'Error stopping HTTP server') } } } /** * Get transport information */ getInfo() { return { type: this.options.type, port: this.options.port, corsOrigins: this.options.corsOrigins, endpoints: this.options.type === 'http' || this.options.type === 'sse' ? { health: '/health', metrics: '/metrics', docs: '/docs', mcp: this.options.type === 'sse' ? '/mcp' : undefined, } : undefined, } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/drzln/curupira'

If you have feedback or need assistance with the MCP directory API, please join our Discord server