Skip to main content
Glama

mcp-github-project-manager

EventStore.ts14.9 kB
import * as fs from 'fs/promises'; import * as path from 'path'; import { ResourceEvent } from './GitHubWebhookHandler'; import { Logger } from '../logger/index'; import { EVENT_RETENTION_DAYS, MAX_EVENTS_IN_MEMORY, CACHE_DIRECTORY } from '../../env'; export interface EventStoreOptions { retentionDays: number; maxEventsInMemory: number; storageDirectory: string; enableCompression: boolean; } export interface EventQuery { resourceType?: string; resourceId?: string; eventType?: string; source?: string; fromTimestamp?: string; toTimestamp?: string; limit?: number; offset?: number; } export interface EventStoreStats { totalEvents: number; eventsInMemory: number; eventsOnDisk: number; oldestEvent?: string; newestEvent?: string; storageSize: number; } export class EventStore { private readonly logger = Logger.getInstance(); private readonly options: EventStoreOptions; private readonly eventsDirectory: string; private readonly indexFile: string; private directoryInitialized = false; // In-memory event buffer for fast access private memoryBuffer: ResourceEvent[] = []; private eventIndex = new Map<string, number>(); // eventId -> buffer index // File rotation tracking private currentFileDate: string = ''; private currentFileEvents: number = 0; private readonly maxEventsPerFile = 10000; constructor(options?: Partial<EventStoreOptions>) { this.options = { retentionDays: options?.retentionDays || EVENT_RETENTION_DAYS, maxEventsInMemory: options?.maxEventsInMemory || MAX_EVENTS_IN_MEMORY, storageDirectory: options?.storageDirectory || path.join(CACHE_DIRECTORY, 'events'), enableCompression: options?.enableCompression ?? true }; this.eventsDirectory = this.options.storageDirectory; this.indexFile = path.join(this.eventsDirectory, 'index.json'); } /** * Store a new event */ async storeEvent(event: ResourceEvent): Promise<void> { try { await this.ensureDirectoryExists(); // Add to memory buffer this.addToMemoryBuffer(event); // Persist to disk await this.persistEvent(event); this.logger.debug(`Stored event ${event.id} (${event.type} ${event.resourceType})`); } catch (error) { this.logger.error(`Failed to store event ${event.id}:`, error); throw error; } } /** * Store multiple events in batch */ async storeEvents(events: ResourceEvent[]): Promise<void> { if (events.length === 0) { return; } try { await this.ensureDirectoryExists(); // Add all to memory buffer for (const event of events) { this.addToMemoryBuffer(event); } // Persist all to disk await this.persistEvents(events); this.logger.debug(`Stored ${events.length} events in batch`); } catch (error) { this.logger.error(`Failed to store ${events.length} events in batch:`, error); throw error; } } /** * Get events by query */ async getEvents(query: EventQuery = {}): Promise<ResourceEvent[]> { try { // First check memory buffer let events = this.queryMemoryBuffer(query); // If we need more events or specific time range, check disk if (this.needsDiskQuery(query, events.length)) { const diskEvents = await this.queryDiskEvents(query); events = this.mergeAndDeduplicateEvents(events, diskEvents); } // Apply final filtering and sorting events = this.applyFinalFiltering(events, query); this.logger.debug(`Retrieved ${events.length} events for query`); return events; } catch (error) { this.logger.error("Failed to get events:", error); throw error; } } /** * Get events from a specific timestamp (for replay) */ async getEventsFromTimestamp(timestamp: string, limit?: number): Promise<ResourceEvent[]> { return this.getEvents({ fromTimestamp: timestamp, limit: limit || 1000 }); } /** * Get recent events */ async getRecentEvents(limit: number = 100): Promise<ResourceEvent[]> { const events = [...this.memoryBuffer] .sort((a, b) => b.timestamp.localeCompare(a.timestamp)) .slice(0, limit); return events; } /** * Clean up old events based on retention policy */ async cleanup(): Promise<void> { try { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - this.options.retentionDays); const cutoffTimestamp = cutoffDate.toISOString(); // Clean memory buffer const initialMemorySize = this.memoryBuffer.length; this.memoryBuffer = this.memoryBuffer.filter(event => event.timestamp >= cutoffTimestamp); this.rebuildEventIndex(); // Clean disk files const deletedFiles = await this.cleanupDiskFiles(cutoffDate); this.logger.info(`Cleanup completed: removed ${initialMemorySize - this.memoryBuffer.length} events from memory, ${deletedFiles} files from disk`); } catch (error) { this.logger.error("Failed to cleanup events:", error); } } /** * Get event store statistics */ async getStats(): Promise<EventStoreStats> { try { const diskStats = await this.getDiskStats(); const stats: EventStoreStats = { totalEvents: this.memoryBuffer.length + diskStats.eventCount, eventsInMemory: this.memoryBuffer.length, eventsOnDisk: diskStats.eventCount, storageSize: diskStats.totalSize }; if (this.memoryBuffer.length > 0) { const sortedEvents = [...this.memoryBuffer].sort((a, b) => a.timestamp.localeCompare(b.timestamp)); stats.oldestEvent = sortedEvents[0].timestamp; stats.newestEvent = sortedEvents[sortedEvents.length - 1].timestamp; } return stats; } catch (error) { this.logger.error("Failed to get event store stats:", error); throw error; } } /** * Add event to memory buffer */ private addToMemoryBuffer(event: ResourceEvent): void { // Check if event already exists if (this.eventIndex.has(event.id)) { return; // Duplicate event, skip } // Add to buffer this.memoryBuffer.push(event); this.eventIndex.set(event.id, this.memoryBuffer.length - 1); // Trim buffer if it exceeds max size if (this.memoryBuffer.length > this.options.maxEventsInMemory) { const eventsToRemove = this.memoryBuffer.length - this.options.maxEventsInMemory; const removedEvents = this.memoryBuffer.splice(0, eventsToRemove); // Update index this.rebuildEventIndex(); this.logger.debug(`Trimmed ${eventsToRemove} events from memory buffer`); } } /** * Rebuild event index after buffer modifications */ private rebuildEventIndex(): void { this.eventIndex.clear(); this.memoryBuffer.forEach((event, index) => { this.eventIndex.set(event.id, index); }); } /** * Persist event to disk */ private async persistEvent(event: ResourceEvent): Promise<void> { await this.persistEvents([event]); } /** * Persist multiple events to disk */ private async persistEvents(events: ResourceEvent[]): Promise<void> { if (events.length === 0) { return; } const today = new Date().toISOString().split('T')[0]; // YYYY-MM-DD // Check if we need a new file if (this.currentFileDate !== today || this.currentFileEvents >= this.maxEventsPerFile) { this.currentFileDate = today; this.currentFileEvents = 0; } const filename = this.getEventFileName(today, this.currentFileEvents); const filepath = path.join(this.eventsDirectory, filename); try { // Read existing events if file exists let existingEvents: ResourceEvent[] = []; try { const content = await fs.readFile(filepath, 'utf8'); existingEvents = JSON.parse(content); } catch { // File doesn't exist or is empty, start fresh } // Append new events existingEvents.push(...events); // Write back to file await fs.writeFile(filepath, JSON.stringify(existingEvents, null, 2)); this.currentFileEvents += events.length; } catch (error) { this.logger.error(`Failed to persist events to ${filepath}:`, error); throw error; } } /** * Query events from memory buffer */ private queryMemoryBuffer(query: EventQuery): ResourceEvent[] { let events = [...this.memoryBuffer]; // Apply filters if (query.resourceType) { events = events.filter(e => e.resourceType === query.resourceType); } if (query.resourceId) { events = events.filter(e => e.resourceId === query.resourceId); } if (query.eventType) { events = events.filter(e => e.type === query.eventType); } if (query.source) { events = events.filter(e => e.source === query.source); } if (query.fromTimestamp) { events = events.filter(e => e.timestamp >= query.fromTimestamp!); } if (query.toTimestamp) { events = events.filter(e => e.timestamp <= query.toTimestamp!); } return events; } /** * Query events from disk files */ private async queryDiskEvents(query: EventQuery): Promise<ResourceEvent[]> { const events: ResourceEvent[] = []; try { const files = await fs.readdir(this.eventsDirectory); const eventFiles = files.filter(f => f.startsWith('events-') && f.endsWith('.json')); for (const file of eventFiles) { try { const filepath = path.join(this.eventsDirectory, file); const content = await fs.readFile(filepath, 'utf8'); const fileEvents: ResourceEvent[] = JSON.parse(content); // Apply basic filtering const filteredEvents = fileEvents.filter(event => { if (query.resourceType && event.resourceType !== query.resourceType) return false; if (query.resourceId && event.resourceId !== query.resourceId) return false; if (query.eventType && event.type !== query.eventType) return false; if (query.source && event.source !== query.source) return false; if (query.fromTimestamp && event.timestamp < query.fromTimestamp) return false; if (query.toTimestamp && event.timestamp > query.toTimestamp) return false; return true; }); events.push(...filteredEvents); } catch (error) { this.logger.warn(`Failed to read event file ${file}:`, error); } } } catch (error) { this.logger.error("Failed to query disk events:", error); } return events; } /** * Check if we need to query disk based on query and memory results */ private needsDiskQuery(query: EventQuery, memoryResultCount: number): boolean { // If we have a specific time range that might extend beyond memory if (query.fromTimestamp || query.toTimestamp) { return true; } // If we need more results than what's in memory if (query.limit && query.limit > memoryResultCount) { return true; } return false; } /** * Merge and deduplicate events from memory and disk */ private mergeAndDeduplicateEvents(memoryEvents: ResourceEvent[], diskEvents: ResourceEvent[]): ResourceEvent[] { const eventMap = new Map<string, ResourceEvent>(); // Add memory events (they take precedence) for (const event of memoryEvents) { eventMap.set(event.id, event); } // Add disk events (only if not already in memory) for (const event of diskEvents) { if (!eventMap.has(event.id)) { eventMap.set(event.id, event); } } return Array.from(eventMap.values()); } /** * Apply final filtering and sorting */ private applyFinalFiltering(events: ResourceEvent[], query: EventQuery): ResourceEvent[] { // Sort by timestamp (newest first) events.sort((a, b) => b.timestamp.localeCompare(a.timestamp)); // Apply offset and limit if (query.offset) { events = events.slice(query.offset); } if (query.limit) { events = events.slice(0, query.limit); } return events; } /** * Get event file name */ private getEventFileName(date: string, sequence: number): string { const paddedSequence = sequence.toString().padStart(4, '0'); return `events-${date}-${paddedSequence}.json`; } /** * Clean up old disk files */ private async cleanupDiskFiles(cutoffDate: Date): Promise<number> { let deletedFiles = 0; try { const files = await fs.readdir(this.eventsDirectory); const eventFiles = files.filter(f => f.startsWith('events-') && f.endsWith('.json')); for (const file of eventFiles) { // Extract date from filename const dateMatch = file.match(/events-(\d{4}-\d{2}-\d{2})-/); if (dateMatch) { const fileDate = new Date(dateMatch[1]); if (fileDate < cutoffDate) { const filepath = path.join(this.eventsDirectory, file); await fs.unlink(filepath); deletedFiles++; this.logger.debug(`Deleted old event file: ${file}`); } } } } catch (error) { this.logger.error("Failed to cleanup disk files:", error); } return deletedFiles; } /** * Get disk storage statistics */ private async getDiskStats(): Promise<{ eventCount: number; totalSize: number }> { let eventCount = 0; let totalSize = 0; try { const files = await fs.readdir(this.eventsDirectory); const eventFiles = files.filter(f => f.startsWith('events-') && f.endsWith('.json')); for (const file of eventFiles) { try { const filepath = path.join(this.eventsDirectory, file); const stats = await fs.stat(filepath); totalSize += stats.size; // Count events in file const content = await fs.readFile(filepath, 'utf8'); const events = JSON.parse(content); eventCount += events.length; } catch (error) { this.logger.warn(`Failed to get stats for event file ${file}:`, error); } } } catch (error) { this.logger.error("Failed to get disk stats:", error); } return { eventCount, totalSize }; } /** * Ensure events directory exists */ private async ensureDirectoryExists(): Promise<void> { if (this.directoryInitialized) { return; } try { // Create parent directory first if it doesn't exist const parentDir = path.dirname(this.eventsDirectory); await fs.mkdir(parentDir, { recursive: true }); // Then create the events directory await fs.mkdir(this.eventsDirectory, { recursive: true }); this.directoryInitialized = true; } catch (error) { this.logger.error("Failed to create events directory:", error); throw error; } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kunwarVivek/mcp-github-project-manager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server