Skip to main content
Glama
StreamSynchronizationManager.ts13.2 kB
/** * Stream Synchronization Manager * * Manages real-time coordination and synchronization between reasoning streams */ import { ConflictDetection, ConflictResolution, CoordinationStatus, InformationSharing, IStreamSynchronizationManager, ReasoningStream, SynchronizationPoint, } from "../interfaces/parallel-reasoning.js"; export class StreamSynchronizationManager implements IStreamSynchronizationManager { private activeStreams: Map<string, ReasoningStream> = new Map(); private synchronizationInterval: number = 1000; // 1 second default private synchronizationTimer?: NodeJS.Timeout; private coordinationStatus: CoordinationStatus; private informationSharingLog: InformationSharing[] = []; private synchronizationPoints: SynchronizationPoint[] = []; private conflictsDetected: ConflictDetection[] = []; private conflictsResolved: ConflictResolution[] = []; private initialized: boolean = false; constructor() { this.coordinationStatus = { active_streams: 0, synchronization_points: 0, conflicts_detected: 0, conflicts_resolved: 0, last_synchronization: 0, coordination_efficiency: 0, }; } async initialize(): Promise<void> { if (this.initialized) { return; } // Initialize coordination status this.coordinationStatus = { active_streams: 0, synchronization_points: 0, conflicts_detected: 0, conflicts_resolved: 0, last_synchronization: Date.now(), coordination_efficiency: 1.0, }; this.initialized = true; } async scheduleSynchronization( streams: ReasoningStream[], interval_ms: number ): Promise<void> { if (!this.initialized) { await this.initialize(); } // Update active streams this.activeStreams.clear(); for (const stream of streams) { this.activeStreams.set(stream.id, stream); } this.synchronizationInterval = interval_ms; this.coordinationStatus.active_streams = streams.length; // Clear existing timer if (this.synchronizationTimer) { clearInterval(this.synchronizationTimer); } // Schedule periodic synchronization this.synchronizationTimer = setInterval(async () => { try { await this.executeSynchronization(streams); } catch { // Synchronization error occurred this.coordinationStatus.coordination_efficiency *= 0.9; // Reduce efficiency on errors } }, interval_ms); } async executeSynchronization( streams: ReasoningStream[] ): Promise<SynchronizationPoint> { const timestamp = Date.now(); const participating_streams = streams.map((s) => s.id); const shared_insights: string[] = []; // Gather insights from active streams for (const stream of streams) { const status = stream.getStatus(); if (status.active && !status.processing) { shared_insights.push( `${stream.type} stream: ${ status.last_activity > timestamp - this.synchronizationInterval ? "recently active" : "idle" }` ); } } // Detect any conflicts that need attention const conflicts = await this.monitorConflicts(streams); if (conflicts.length > 0) { shared_insights.push( `${conflicts.length} conflicts detected requiring resolution` ); } // Create synchronization point const synchronizationPoint: SynchronizationPoint = { timestamp, participating_streams, shared_insights, coordination_type: conflicts.length > 0 ? "conflict_resolution" : "information_exchange", }; // Update tracking this.synchronizationPoints.push(synchronizationPoint); this.coordinationStatus.synchronization_points = this.synchronizationPoints.length; this.coordinationStatus.last_synchronization = timestamp; // Update coordination efficiency this.updateCoordinationEfficiency(streams, conflicts); return synchronizationPoint; } async enableRealTimeCoordination(streams: ReasoningStream[]): Promise<void> { if (!this.initialized) { await this.initialize(); } // Enable real-time coordination with frequent synchronization await this.scheduleSynchronization(streams, 500); // 500ms intervals for real-time // Set up information sharing channels for (const stream of streams) { this.activeStreams.set(stream.id, stream); } // Real-time coordination enabled } async shareInformation( from_stream: string, to_stream: string, information: string ): Promise<void> { const timestamp = Date.now(); // Validate streams exist if ( !this.activeStreams.has(from_stream) || !this.activeStreams.has(to_stream) ) { throw new Error("Invalid stream IDs for information sharing"); } // Create information sharing record const sharing: InformationSharing = { from_stream, to_stream, shared_information: information, information_type: this.classifyInformation(information), timestamp, }; // Log the sharing this.informationSharingLog.push(sharing); // In a real implementation, this would actually pass information between streams // For now, we just log it // Information shared between streams } async monitorConflicts( streams: ReasoningStream[] ): Promise<ConflictDetection[]> { const conflicts: ConflictDetection[] = []; // Check for processing conflicts (streams taking too long) const processingStreams = streams.filter((s) => s.getStatus().processing); const longRunningStreams = processingStreams.filter((s) => { const status = s.getStatus(); return ( Date.now() - status.last_activity > this.synchronizationInterval * 5 ); // 5x interval threshold }); if (longRunningStreams.length > 0) { conflicts.push({ stream_ids: longRunningStreams.map((s) => s.id), conflict_type: "reasoning", description: "Streams exceeding expected processing time", severity: 0.6, }); } // Check for error conflicts const errorStreams = streams.filter((s) => s.getStatus().error); if (errorStreams.length > 0) { conflicts.push({ stream_ids: errorStreams.map((s) => s.id), conflict_type: "reasoning", description: "Streams reporting errors", severity: 0.8, }); } // Update tracking this.conflictsDetected.push(...conflicts); this.coordinationStatus.conflicts_detected = this.conflictsDetected.length; return conflicts; } async facilitateResolution( conflict: ConflictDetection, _streams: ReasoningStream[] ): Promise<ConflictResolution> { // Add to detected conflicts if not already present if ( !this.conflictsDetected.some( (c) => c.stream_ids.join(",") === conflict.stream_ids.join(",") && c.description === conflict.description ) ) { this.conflictsDetected.push(conflict); this.coordinationStatus.conflicts_detected = this.conflictsDetected.length; } let resolution_strategy: string; let resolved_conclusion: string; let confidence: number; switch (conflict.conflict_type) { case "reasoning": if (conflict.description.includes("processing time")) { resolution_strategy = "Timeout management and graceful degradation"; resolved_conclusion = "Continue with available results, mark slow streams as degraded"; confidence = 0.7; // Reset slow streams for (const streamId of conflict.stream_ids) { const stream = this.activeStreams.get(streamId); if (stream) { stream.reset(); } } } else if (conflict.description.includes("errors")) { resolution_strategy = "Error recovery and stream restart"; resolved_conclusion = "Restart error streams with fallback configuration"; confidence = 0.6; // Attempt to restart error streams for (const streamId of conflict.stream_ids) { const stream = this.activeStreams.get(streamId); if (stream) { try { stream.reset(); await stream.initialize(); } catch { // Failed to restart stream } } } } else { resolution_strategy = "General conflict mediation"; resolved_conclusion = "Apply standard conflict resolution protocols"; confidence = 0.5; } break; default: resolution_strategy = "Default resolution protocol"; resolved_conclusion = "Apply general conflict resolution approach"; confidence = 0.4; } const resolution: ConflictResolution = { conflicting_streams: conflict.stream_ids, conflict_description: conflict.description, resolution_strategy, resolved_conclusion, confidence, }; // Update tracking this.conflictsResolved.push(resolution); this.coordinationStatus.conflicts_resolved = this.conflictsResolved.length; return resolution; } getCoordinationStatus(): CoordinationStatus { return { ...this.coordinationStatus }; } async shutdown(): Promise<void> { // Clear synchronization timer if (this.synchronizationTimer) { clearInterval(this.synchronizationTimer); this.synchronizationTimer = undefined; } // Clear active streams this.activeStreams.clear(); // Reset status this.coordinationStatus = { active_streams: 0, synchronization_points: this.synchronizationPoints.length, conflicts_detected: this.conflictsDetected.length, conflicts_resolved: this.conflictsResolved.length, last_synchronization: this.coordinationStatus.last_synchronization, coordination_efficiency: this.coordinationStatus.coordination_efficiency, }; this.initialized = false; } // Additional utility methods getSynchronizationHistory(): SynchronizationPoint[] { return [...this.synchronizationPoints]; } getInformationSharingLog(): InformationSharing[] { return [...this.informationSharingLog]; } getConflictHistory(): { detected: ConflictDetection[]; resolved: ConflictResolution[]; } { return { detected: [...this.conflictsDetected], resolved: [...this.conflictsResolved], }; } private classifyInformation( information: string ): "insight" | "evidence" | "assumption" | "conclusion" { const lowerInfo = information.toLowerCase(); if (lowerInfo.includes("insight") ?? lowerInfo.includes("understanding")) { return "insight"; } else if (lowerInfo.includes("evidence") ?? lowerInfo.includes("data")) { return "evidence"; } else if (lowerInfo.includes("assume") ?? lowerInfo.includes("premise")) { return "assumption"; } else { return "conclusion"; } } private updateCoordinationEfficiency( streams: ReasoningStream[], conflicts: ConflictDetection[] ): void { const activeStreamCount = streams.filter( (s) => s.getStatus().active ).length; const totalStreamCount = streams.length; const conflictSeverity = conflicts.reduce((sum, c) => sum + c.severity, 0); // Calculate efficiency based on active streams and conflict severity const activeRatio = totalStreamCount > 0 ? activeStreamCount / totalStreamCount : 0; const conflictPenalty = Math.min(0.5, conflictSeverity * 0.1); const newEfficiency = activeRatio - conflictPenalty; // Smooth the efficiency change this.coordinationStatus.coordination_efficiency = this.coordinationStatus.coordination_efficiency * 0.8 + newEfficiency * 0.2; // Ensure efficiency stays within bounds this.coordinationStatus.coordination_efficiency = Math.max( 0, Math.min(1, this.coordinationStatus.coordination_efficiency) ); } // Performance monitoring methods getAverageResponseTime(): number { if (this.synchronizationPoints.length < 2) { return 0; } const intervals = []; for (let i = 1; i < this.synchronizationPoints.length; i++) { const interval = this.synchronizationPoints[i].timestamp - this.synchronizationPoints[i - 1].timestamp; intervals.push(interval); } return ( intervals.reduce((sum, interval) => sum + interval, 0) / intervals.length ); } getConflictResolutionRate(): number { if (this.conflictsDetected.length === 0) { return 1.0; // No conflicts is perfect resolution rate } return this.conflictsResolved.length / this.conflictsDetected.length; } getInformationSharingRate(): number { const timeWindow = 60000; // 1 minute const now = Date.now(); const recentSharing = this.informationSharingLog.filter( (sharing) => now - sharing.timestamp < timeWindow ); return recentSharing.length / (timeWindow / 1000); // Sharing per second } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keyurgolani/ThoughtMcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server