Skip to main content
Glama

meMCP - Memory-Enhanced Model Context Protocol

MIT License
23
2
StreamingManager.js7.51 kB
export class StreamingManager { constructor() { this.activeStreams = new Map(); this.streamCounter = 0; this.chunkSize = 10; // Number of facts per chunk this.maxConcurrentStreams = 5; } generateStreamId() { return `stream_${Date.now()}_${++this.streamCounter}`; } async createStream(facts, options = {}) { if (this.activeStreams.size >= this.maxConcurrentStreams) { throw new Error('Maximum concurrent streams exceeded'); } const streamId = this.generateStreamId(); const chunkSize = options.chunkSize || this.chunkSize; const includeProgress = options.includeProgress !== false; const stream = { id: streamId, facts, totalFacts: facts.length, chunkSize, currentIndex: 0, includeProgress, startTime: Date.now(), status: 'active', metadata: { query: options.query || '', type: options.type || 'all', domain: options.domain || 'all', } }; this.activeStreams.set(streamId, stream); return streamId; } async getNextChunk(streamId) { const stream = this.activeStreams.get(streamId); if (!stream) { throw new Error(`Stream ${streamId} not found`); } if (stream.status !== 'active') { throw new Error(`Stream ${streamId} is not active`); } const startIndex = stream.currentIndex; const endIndex = Math.min(startIndex + stream.chunkSize, stream.totalFacts); const chunk = stream.facts.slice(startIndex, endIndex); stream.currentIndex = endIndex; const isLastChunk = endIndex >= stream.totalFacts; if (isLastChunk) { stream.status = 'completed'; stream.endTime = Date.now(); } const progress = { current: endIndex, total: stream.totalFacts, percentage: Math.round((endIndex / stream.totalFacts) * 100), remainingFacts: stream.totalFacts - endIndex, estimatedTimeRemaining: this.calculateEstimatedTime(stream, endIndex), }; const chunkData = { streamId, chunkIndex: Math.floor(startIndex / stream.chunkSize), isLastChunk, facts: chunk.map(fact => this.formatFactForStreaming(fact)), progress: stream.includeProgress ? progress : undefined, metadata: stream.metadata, }; return chunkData; } calculateEstimatedTime(stream, currentIndex) { if (currentIndex === 0) return null; const elapsedTime = Date.now() - stream.startTime; const avgTimePerFact = elapsedTime / currentIndex; const remainingFacts = stream.totalFacts - currentIndex; return Math.round((remainingFacts * avgTimePerFact) / 1000); // seconds } formatFactForStreaming(fact) { return { id: fact.id, content: fact.content, type: fact.type, domain: fact.domain, qualityScore: fact.qualityScore, tags: fact.tags || [], timestamp: fact.timestamp, relevanceScore: fact.relevanceScore || 0, summary: fact.content.length > 150 ? fact.content.substring(0, 150) + '...' : fact.content, }; } async getStreamStatus(streamId) { const stream = this.activeStreams.get(streamId); if (!stream) { throw new Error(`Stream ${streamId} not found`); } return { id: streamId, status: stream.status, progress: { current: stream.currentIndex, total: stream.totalFacts, percentage: Math.round((stream.currentIndex / stream.totalFacts) * 100), }, metadata: stream.metadata, startTime: stream.startTime, endTime: stream.endTime || null, duration: stream.endTime ? stream.endTime - stream.startTime : Date.now() - stream.startTime, }; } async cancelStream(streamId) { const stream = this.activeStreams.get(streamId); if (!stream) { throw new Error(`Stream ${streamId} not found`); } stream.status = 'cancelled'; stream.endTime = Date.now(); return { streamId, status: 'cancelled', processed: stream.currentIndex, total: stream.totalFacts, }; } async cleanupCompletedStreams() { const cutoffTime = Date.now() - (30 * 60 * 1000); // 30 minutes for (const [streamId, stream] of this.activeStreams.entries()) { if ( (stream.status === 'completed' || stream.status === 'cancelled') && (stream.endTime || stream.startTime) < cutoffTime ) { this.activeStreams.delete(streamId); } } } async getAllStreams() { const streams = []; for (const [streamId, stream] of this.activeStreams.entries()) { streams.push(await this.getStreamStatus(streamId)); } return streams; } async pauseStream(streamId) { const stream = this.activeStreams.get(streamId); if (!stream) { throw new Error(`Stream ${streamId} not found`); } if (stream.status !== 'active') { throw new Error(`Stream ${streamId} is not active`); } stream.status = 'paused'; stream.pausedAt = Date.now(); return { streamId, status: 'paused', progress: { current: stream.currentIndex, total: stream.totalFacts, }, }; } async resumeStream(streamId) { const stream = this.activeStreams.get(streamId); if (!stream) { throw new Error(`Stream ${streamId} not found`); } if (stream.status !== 'paused') { throw new Error(`Stream ${streamId} is not paused`); } stream.status = 'active'; if (stream.pausedAt) { // Adjust start time to account for pause duration const pauseDuration = Date.now() - stream.pausedAt; stream.startTime += pauseDuration; delete stream.pausedAt; } return { streamId, status: 'active', progress: { current: stream.currentIndex, total: stream.totalFacts, }, }; } // Batch streaming for very large queries async createBatchStream(queryParams, factStore, options = {}) { const batchSize = options.batchSize || 100; const maxResults = options.maxResults || 1000; // Execute query in batches let allFacts = []; let offset = 0; let hasMore = true; while (hasMore && allFacts.length < maxResults) { const batchParams = { ...queryParams, limit: Math.min(batchSize, maxResults - allFacts.length), offset, }; const result = await factStore.queryFacts(batchParams); const facts = result.facts || []; if (facts.length === 0) { hasMore = false; } else { allFacts = allFacts.concat(facts); offset += facts.length; if (facts.length < batchSize) { hasMore = false; } } } return await this.createStream(allFacts, { ...options, query: queryParams.query || '', type: queryParams.type || 'all', domain: queryParams.domain || 'all', }); } getStats() { const streams = Array.from(this.activeStreams.values()); return { totalStreams: streams.length, activeStreams: streams.filter(s => s.status === 'active').length, pausedStreams: streams.filter(s => s.status === 'paused').length, completedStreams: streams.filter(s => s.status === 'completed').length, cancelledStreams: streams.filter(s => s.status === 'cancelled').length, totalFactsStreaming: streams.reduce((sum, s) => sum + s.totalFacts, 0), totalFactsProcessed: streams.reduce((sum, s) => sum + s.currentIndex, 0), }; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mixelpixx/meMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server