Skip to main content
Glama

mcp-github-project-manager

EventSubscriptionManager.ts13.1 kB
import { ResourceType } from '../../domain/resource-types'; import { ResourceEvent } from './GitHubWebhookHandler'; import { Logger } from '../logger/index'; import { EventEmitter } from 'events'; export interface EventFilter { resourceType?: ResourceType; eventType?: ResourceEvent['type']; resourceId?: string; tags?: string[]; source?: ResourceEvent['source']; } export interface EventSubscription { id: string; clientId: string; filters: EventFilter[]; transport: 'sse' | 'webhook' | 'internal'; endpoint?: string; lastEventId?: string; createdAt: string; expiresAt?: string; active: boolean; metadata?: Record<string, any>; } export interface SubscriptionStats { totalSubscriptions: number; activeSubscriptions: number; subscriptionsByTransport: Record<string, number>; subscriptionsByResourceType: Record<string, number>; } export class EventSubscriptionManager extends EventEmitter { private readonly logger = Logger.getInstance(); private subscriptions = new Map<string, EventSubscription>(); private clientSubscriptions = new Map<string, Set<string>>(); private resourceTypeIndex = new Map<ResourceType, Set<string>>(); private eventTypeIndex = new Map<ResourceEvent['type'], Set<string>>(); constructor() { super(); this.setMaxListeners(1000); // Allow many event listeners } /** * Create a new event subscription */ subscribe(subscription: Omit<EventSubscription, 'id' | 'createdAt' | 'active'>): string { const subscriptionId = this.generateSubscriptionId(); const fullSubscription: EventSubscription = { ...subscription, id: subscriptionId, createdAt: new Date().toISOString(), active: true }; // Store subscription this.subscriptions.set(subscriptionId, fullSubscription); // Index by client if (!this.clientSubscriptions.has(subscription.clientId)) { this.clientSubscriptions.set(subscription.clientId, new Set()); } this.clientSubscriptions.get(subscription.clientId)!.add(subscriptionId); // Index by resource types and event types in filters this.indexSubscription(subscriptionId, fullSubscription); this.logger.info(`Created subscription ${subscriptionId} for client ${subscription.clientId}`); this.emit('subscriptionCreated', fullSubscription); return subscriptionId; } /** * Remove a subscription */ unsubscribe(subscriptionId: string): boolean { const subscription = this.subscriptions.get(subscriptionId); if (!subscription) { return false; } // Remove from indices this.removeFromIndices(subscriptionId, subscription); // Remove from client index const clientSubs = this.clientSubscriptions.get(subscription.clientId); if (clientSubs) { clientSubs.delete(subscriptionId); if (clientSubs.size === 0) { this.clientSubscriptions.delete(subscription.clientId); } } // Remove subscription this.subscriptions.delete(subscriptionId); this.logger.info(`Removed subscription ${subscriptionId} for client ${subscription.clientId}`); this.emit('subscriptionRemoved', subscription); return true; } /** * Unsubscribe all subscriptions for a client */ unsubscribeClient(clientId: string): number { const clientSubs = this.clientSubscriptions.get(clientId); if (!clientSubs) { return 0; } const subscriptionIds = Array.from(clientSubs); let removedCount = 0; for (const subscriptionId of subscriptionIds) { if (this.unsubscribe(subscriptionId)) { removedCount++; } } this.logger.info(`Removed ${removedCount} subscriptions for client ${clientId}`); return removedCount; } /** * Get subscription by ID */ getSubscription(subscriptionId: string): EventSubscription | null { return this.subscriptions.get(subscriptionId) || null; } /** * Get all subscriptions for a client */ getClientSubscriptions(clientId: string): EventSubscription[] { const subscriptionIds = this.clientSubscriptions.get(clientId); if (!subscriptionIds) { return []; } const subscriptions: EventSubscription[] = []; for (const id of subscriptionIds) { const subscription = this.subscriptions.get(id); if (subscription) { subscriptions.push(subscription); } } return subscriptions; } /** * Notify subscribers of a new event */ async notifySubscribers(event: ResourceEvent): Promise<void> { const matchingSubscriptions = this.getSubscriptionsForEvent(event); if (matchingSubscriptions.length === 0) { this.logger.debug(`No subscriptions found for event ${event.id}`); return; } this.logger.debug(`Notifying ${matchingSubscriptions.length} subscribers for event ${event.id}`); // Group subscriptions by transport type for efficient processing const subscriptionsByTransport = this.groupSubscriptionsByTransport(matchingSubscriptions); // Process each transport type for (const [transport, subscriptions] of subscriptionsByTransport.entries()) { try { await this.notifySubscriptionsByTransport(transport, subscriptions, event); } catch (error) { this.logger.error(`Failed to notify ${transport} subscribers:`, error); } } } /** * Get subscriptions that match an event */ getSubscriptionsForEvent(event: ResourceEvent): EventSubscription[] { const matchingSubscriptions: EventSubscription[] = []; // Get potential matches from indices const potentialMatches = new Set<string>(); // Add subscriptions that match resource type const resourceTypeMatches = this.resourceTypeIndex.get(event.resourceType); if (resourceTypeMatches) { resourceTypeMatches.forEach(id => potentialMatches.add(id)); } // Add subscriptions that match event type const eventTypeMatches = this.eventTypeIndex.get(event.type); if (eventTypeMatches) { eventTypeMatches.forEach(id => potentialMatches.add(id)); } // If no index matches, check all subscriptions (for subscriptions with no filters) if (potentialMatches.size === 0) { this.subscriptions.forEach((_, id) => potentialMatches.add(id)); } // Filter potential matches for (const subscriptionId of potentialMatches) { const subscription = this.subscriptions.get(subscriptionId); if (subscription && subscription.active && this.eventMatchesSubscription(event, subscription)) { matchingSubscriptions.push(subscription); } } return matchingSubscriptions; } /** * Update subscription's last event ID */ updateLastEventId(subscriptionId: string, eventId: string): void { const subscription = this.subscriptions.get(subscriptionId); if (subscription) { subscription.lastEventId = eventId; this.subscriptions.set(subscriptionId, subscription); } } /** * Deactivate expired subscriptions */ cleanupExpiredSubscriptions(): number { const now = new Date().toISOString(); const expiredSubscriptions: string[] = []; for (const [id, subscription] of this.subscriptions.entries()) { if (subscription.expiresAt && subscription.expiresAt < now) { expiredSubscriptions.push(id); } } let removedCount = 0; for (const id of expiredSubscriptions) { if (this.unsubscribe(id)) { removedCount++; } } if (removedCount > 0) { this.logger.info(`Cleaned up ${removedCount} expired subscriptions`); } return removedCount; } /** * Get subscription statistics */ getStats(): SubscriptionStats { const stats: SubscriptionStats = { totalSubscriptions: this.subscriptions.size, activeSubscriptions: 0, subscriptionsByTransport: {}, subscriptionsByResourceType: {} }; for (const subscription of this.subscriptions.values()) { if (subscription.active) { stats.activeSubscriptions++; } // Count by transport const transport = subscription.transport; stats.subscriptionsByTransport[transport] = (stats.subscriptionsByTransport[transport] || 0) + 1; // Count by resource type (from filters) for (const filter of subscription.filters) { if (filter.resourceType) { const resourceType = filter.resourceType; stats.subscriptionsByResourceType[resourceType] = (stats.subscriptionsByResourceType[resourceType] || 0) + 1; } } } return stats; } /** * Generate unique subscription ID */ private generateSubscriptionId(): string { return `sub-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; } /** * Index subscription for efficient lookup */ private indexSubscription(subscriptionId: string, subscription: EventSubscription): void { for (const filter of subscription.filters) { // Index by resource type if (filter.resourceType) { if (!this.resourceTypeIndex.has(filter.resourceType)) { this.resourceTypeIndex.set(filter.resourceType, new Set()); } this.resourceTypeIndex.get(filter.resourceType)!.add(subscriptionId); } // Index by event type if (filter.eventType) { if (!this.eventTypeIndex.has(filter.eventType)) { this.eventTypeIndex.set(filter.eventType, new Set()); } this.eventTypeIndex.get(filter.eventType)!.add(subscriptionId); } } } /** * Remove subscription from indices */ private removeFromIndices(subscriptionId: string, subscription: EventSubscription): void { for (const filter of subscription.filters) { // Remove from resource type index if (filter.resourceType) { const resourceTypeSet = this.resourceTypeIndex.get(filter.resourceType); if (resourceTypeSet) { resourceTypeSet.delete(subscriptionId); if (resourceTypeSet.size === 0) { this.resourceTypeIndex.delete(filter.resourceType); } } } // Remove from event type index if (filter.eventType) { const eventTypeSet = this.eventTypeIndex.get(filter.eventType); if (eventTypeSet) { eventTypeSet.delete(subscriptionId); if (eventTypeSet.size === 0) { this.eventTypeIndex.delete(filter.eventType); } } } } } /** * Check if event matches subscription filters */ private eventMatchesSubscription(event: ResourceEvent, subscription: EventSubscription): boolean { // If no filters, match all events if (subscription.filters.length === 0) { return true; } // Event must match at least one filter return subscription.filters.some(filter => this.eventMatchesFilter(event, filter)); } /** * Check if event matches a specific filter */ private eventMatchesFilter(event: ResourceEvent, filter: EventFilter): boolean { // Check resource type if (filter.resourceType && filter.resourceType !== event.resourceType) { return false; } // Check event type if (filter.eventType && filter.eventType !== event.type) { return false; } // Check resource ID if (filter.resourceId && filter.resourceId !== event.resourceId) { return false; } // Check source if (filter.source && filter.source !== event.source) { return false; } // Check tags (if event has metadata with tags) if (filter.tags && filter.tags.length > 0) { const eventTags = event.metadata?.tags || []; const hasMatchingTag = filter.tags.some(tag => eventTags.includes(tag)); if (!hasMatchingTag) { return false; } } return true; } /** * Group subscriptions by transport type */ private groupSubscriptionsByTransport(subscriptions: EventSubscription[]): Map<string, EventSubscription[]> { const grouped = new Map<string, EventSubscription[]>(); for (const subscription of subscriptions) { const transport = subscription.transport; if (!grouped.has(transport)) { grouped.set(transport, []); } grouped.get(transport)!.push(subscription); } return grouped; } /** * Notify subscriptions by transport type */ private async notifySubscriptionsByTransport( transport: string, subscriptions: EventSubscription[], event: ResourceEvent ): Promise<void> { switch (transport) { case 'sse': this.emit('sseEvent', { subscriptions, event }); break; case 'webhook': this.emit('webhookEvent', { subscriptions, event }); break; case 'internal': this.emit('internalEvent', { subscriptions, event }); break; default: this.logger.warn(`Unknown transport type: ${transport}`); } // Update last event ID for all notified subscriptions for (const subscription of subscriptions) { this.updateLastEventId(subscription.id, event.id); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kunwarVivek/mcp-github-project-manager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server