Skip to main content
Glama

mcp-github-project-manager

WebhookServer.ts14.5 kB
import * as http from 'http'; import * as url from 'url'; import { GitHubWebhookHandler, ResourceEvent } from '../events/GitHubWebhookHandler'; import { EventSubscriptionManager, EventSubscription, EventFilter } from '../events/EventSubscriptionManager'; import { EventStore } from '../events/EventStore'; import { Logger } from '../logger/index'; import { WEBHOOK_PORT, SSE_ENABLED, WEBHOOK_TIMEOUT_MS } from '../../env'; export interface SSEConnection { id: string; response: http.ServerResponse; subscriptionId?: string; lastEventId?: string; clientId: string; createdAt: string; } export interface WebhookServerOptions { port: number; enableSSE: boolean; timeout: number; } export class WebhookServer { private readonly logger = Logger.getInstance(); private readonly options: WebhookServerOptions; private readonly webhookHandler: GitHubWebhookHandler; private readonly subscriptionManager: EventSubscriptionManager; private readonly eventStore: EventStore; private server?: http.Server; private sseConnections = new Map<string, SSEConnection>(); private isRunning = false; constructor( webhookHandler: GitHubWebhookHandler, subscriptionManager: EventSubscriptionManager, eventStore: EventStore, options?: Partial<WebhookServerOptions> ) { this.options = { port: options?.port || WEBHOOK_PORT, enableSSE: options?.enableSSE ?? SSE_ENABLED, timeout: options?.timeout || WEBHOOK_TIMEOUT_MS }; this.webhookHandler = webhookHandler; this.subscriptionManager = subscriptionManager; this.eventStore = eventStore; this.setupEventListeners(); } /** * Start the webhook server */ async start(): Promise<void> { if (this.isRunning) { throw new Error("Webhook server is already running"); } return new Promise((resolve, reject) => { this.server = http.createServer((req, res) => { this.handleRequest(req, res).catch(error => { this.logger.error("Error handling request:", error); this.sendErrorResponse(res, 500, "Internal server error"); }); }); this.server.setTimeout(this.options.timeout); this.server.listen(this.options.port, () => { this.isRunning = true; this.logger.info(`Webhook server started on port ${this.options.port}`); resolve(); }); this.server.on('error', (error) => { this.logger.error("Webhook server error:", error); reject(error); }); }); } /** * Stop the webhook server */ async stop(): Promise<void> { if (!this.isRunning || !this.server) { return; } return new Promise((resolve) => { // Close all SSE connections for (const connection of this.sseConnections.values()) { connection.response.end(); } this.sseConnections.clear(); this.server!.close(() => { this.isRunning = false; this.logger.info("Webhook server stopped"); resolve(); }); }); } /** * Get server status */ getStatus(): { running: boolean; port: number; connections: number } { return { running: this.isRunning, port: this.options.port, connections: this.sseConnections.size }; } /** * Handle incoming HTTP requests */ private async handleRequest(req: http.IncomingMessage, res: http.ServerResponse): Promise<void> { const parsedUrl = url.parse(req.url || '', true); const pathname = parsedUrl.pathname || ''; const method = req.method || 'GET'; // Set CORS headers this.setCORSHeaders(res); // Handle preflight requests if (method === 'OPTIONS') { res.writeHead(200); res.end(); return; } try { switch (true) { case pathname === '/webhooks/github' && method === 'POST': await this.handleGitHubWebhook(req, res); break; case pathname === '/events/stream' && method === 'GET': await this.handleSSEConnection(req, res); break; case pathname === '/events/subscribe' && method === 'POST': await this.handleCreateSubscription(req, res); break; case pathname.startsWith('/events/subscribe/') && method === 'DELETE': await this.handleDeleteSubscription(req, res, pathname); break; case pathname.startsWith('/events/replay/') && method === 'GET': await this.handleEventReplay(req, res, pathname); break; case pathname === '/events/recent' && method === 'GET': await this.handleRecentEvents(req, res, parsedUrl.query); break; case pathname === '/health' && method === 'GET': await this.handleHealthCheck(req, res); break; default: this.sendErrorResponse(res, 404, "Not found"); } } catch (error) { this.logger.error(`Error handling ${method} ${pathname}:`, error); this.sendErrorResponse(res, 500, "Internal server error"); } } /** * Handle GitHub webhook */ private async handleGitHubWebhook(req: http.IncomingMessage, res: http.ServerResponse): Promise<void> { const body = await this.readRequestBody(req); const signature = req.headers['x-hub-signature-256'] as string; const eventType = req.headers['x-github-event'] as string; const delivery = req.headers['x-github-delivery'] as string; if (!eventType) { this.sendErrorResponse(res, 400, "Missing X-GitHub-Event header"); return; } // Validate signature const isValidSignature = await this.webhookHandler.validateSignature(body, signature); if (!isValidSignature) { this.sendErrorResponse(res, 401, "Invalid signature"); return; } try { const payload = JSON.parse(body); // Validate payload structure if (!this.webhookHandler.validateWebhookPayload(eventType, payload)) { this.sendErrorResponse(res, 400, "Invalid payload structure"); return; } // Create webhook event const webhookEvent = this.webhookHandler.createWebhookEvent(eventType, payload, signature, delivery); // Process webhook const result = await this.webhookHandler.processWebhookEvent(webhookEvent); if (result.success) { // Store events if (result.events.length > 0) { await this.eventStore.storeEvents(result.events); // Notify subscribers for (const event of result.events) { await this.subscriptionManager.notifySubscribers(event); } } this.sendJSONResponse(res, 200, { success: true, eventsGenerated: result.events.length, skipped: result.skipped, reason: result.reason }); } else { this.sendJSONResponse(res, 400, { success: false, errors: result.errors }); } } catch (error) { this.logger.error("Error processing webhook:", error); this.sendErrorResponse(res, 400, "Invalid JSON payload"); } } /** * Handle SSE connection */ private async handleSSEConnection(req: http.IncomingMessage, res: http.ServerResponse): Promise<void> { if (!this.options.enableSSE) { this.sendErrorResponse(res, 503, "SSE not enabled"); return; } const parsedUrl = url.parse(req.url || '', true); const clientId = parsedUrl.query.clientId as string || `client-${Date.now()}`; const lastEventId = req.headers['last-event-id'] as string; // Set SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control' }); // Create SSE connection const connectionId = `sse-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; const connection: SSEConnection = { id: connectionId, response: res, lastEventId, clientId, createdAt: new Date().toISOString() }; this.sseConnections.set(connectionId, connection); // Send initial connection event this.sendSSEEvent(res, 'connected', { connectionId, clientId }); // Handle connection close req.on('close', () => { this.sseConnections.delete(connectionId); this.logger.debug(`SSE connection ${connectionId} closed`); }); // Send missed events if lastEventId is provided if (lastEventId) { try { const missedEvents = await this.eventStore.getEventsFromTimestamp(lastEventId, 100); for (const event of missedEvents) { this.sendSSEEvent(res, 'event', event); } } catch (error) { this.logger.error("Error sending missed events:", error); } } this.logger.info(`SSE connection established: ${connectionId} for client ${clientId}`); } /** * Handle create subscription */ private async handleCreateSubscription(req: http.IncomingMessage, res: http.ServerResponse): Promise<void> { const body = await this.readRequestBody(req); try { const subscriptionData = JSON.parse(body); // Validate required fields if (!subscriptionData.clientId || !subscriptionData.transport) { this.sendErrorResponse(res, 400, "Missing required fields: clientId, transport"); return; } // Create subscription const subscriptionId = this.subscriptionManager.subscribe({ clientId: subscriptionData.clientId, filters: subscriptionData.filters || [], transport: subscriptionData.transport, endpoint: subscriptionData.endpoint, expiresAt: subscriptionData.expiresAt, metadata: subscriptionData.metadata }); this.sendJSONResponse(res, 201, { subscriptionId, success: true }); } catch (error) { this.logger.error("Error creating subscription:", error); this.sendErrorResponse(res, 400, "Invalid subscription data"); } } /** * Handle delete subscription */ private async handleDeleteSubscription(req: http.IncomingMessage, res: http.ServerResponse, pathname: string): Promise<void> { const subscriptionId = pathname.split('/').pop(); if (!subscriptionId) { this.sendErrorResponse(res, 400, "Missing subscription ID"); return; } const success = this.subscriptionManager.unsubscribe(subscriptionId); if (success) { this.sendJSONResponse(res, 200, { success: true }); } else { this.sendErrorResponse(res, 404, "Subscription not found"); } } /** * Handle event replay */ private async handleEventReplay(req: http.IncomingMessage, res: http.ServerResponse, pathname: string): Promise<void> { const timestamp = pathname.split('/').pop(); if (!timestamp) { this.sendErrorResponse(res, 400, "Missing timestamp"); return; } try { const events = await this.eventStore.getEventsFromTimestamp(timestamp, 1000); this.sendJSONResponse(res, 200, { events }); } catch (error) { this.logger.error("Error replaying events:", error); this.sendErrorResponse(res, 500, "Error replaying events"); } } /** * Handle recent events */ private async handleRecentEvents(req: http.IncomingMessage, res: http.ServerResponse, query: any): Promise<void> { const limit = parseInt(query.limit as string) || 100; try { const events = await this.eventStore.getRecentEvents(limit); this.sendJSONResponse(res, 200, { events }); } catch (error) { this.logger.error("Error getting recent events:", error); this.sendErrorResponse(res, 500, "Error getting recent events"); } } /** * Handle health check */ private async handleHealthCheck(req: http.IncomingMessage, res: http.ServerResponse): Promise<void> { const stats = await this.eventStore.getStats(); const subscriptionStats = this.subscriptionManager.getStats(); this.sendJSONResponse(res, 200, { status: 'healthy', server: this.getStatus(), events: stats, subscriptions: subscriptionStats }); } /** * Setup event listeners */ private setupEventListeners(): void { // Listen for SSE events this.subscriptionManager.on('sseEvent', ({ subscriptions, event }) => { this.handleSSEEventNotification(subscriptions, event); }); } /** * Handle SSE event notification */ private handleSSEEventNotification(subscriptions: EventSubscription[], event: ResourceEvent): void { for (const subscription of subscriptions) { // Find SSE connections for this subscription for (const connection of this.sseConnections.values()) { if (connection.clientId === subscription.clientId) { this.sendSSEEvent(connection.response, 'event', event); } } } } /** * Send SSE event */ private sendSSEEvent(res: http.ServerResponse, type: string, data: any): void { try { const eventData = `event: ${type}\ndata: ${JSON.stringify(data)}\nid: ${data.id || Date.now()}\n\n`; res.write(eventData); } catch (error) { this.logger.error("Error sending SSE event:", error); } } /** * Read request body */ private async readRequestBody(req: http.IncomingMessage): Promise<string> { return new Promise((resolve, reject) => { let body = ''; req.on('data', chunk => { body += chunk.toString(); }); req.on('end', () => { resolve(body); }); req.on('error', reject); }); } /** * Set CORS headers */ private setCORSHeaders(res: http.ServerResponse): void { res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS'); res.setHeader('Access-Control-Allow-Headers', 'Content-Type, X-Hub-Signature-256, X-GitHub-Event, X-GitHub-Delivery'); } /** * Send JSON response */ private sendJSONResponse(res: http.ServerResponse, statusCode: number, data: any): void { res.writeHead(statusCode, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(data)); } /** * Send error response */ private sendErrorResponse(res: http.ServerResponse, statusCode: number, message: string): void { res.writeHead(statusCode, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: message })); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kunwarVivek/mcp-github-project-manager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server