Skip to main content
Glama
coordination-manager.ts13.1 kB
/** * Stream Coordination Manager * * Coordinates parallel reasoning streams with: * - Synchronization at 25%, 50%, 75% progress checkpoints * - Selective insight sharing (importance >0.7) * - Convergence prevention (diversity >0.7) * - Coordination overhead monitoring (<10%) * - Stream independence maintenance */ import type { ReasoningStream } from "./stream"; import type { Insight, StreamResult, StreamStatus } from "./types"; /** * Synchronization result */ export interface SynchronizationResult { synchronized: boolean; checkpoint: number; streamsAtCheckpoint: number; syncTime: number; } /** * Insight sharing result */ export interface InsightSharingResult { insightsShared: number; recipientStreams: number; shareTime: number; } /** * Overhead metrics */ export interface OverheadMetrics { totalCoordinationTime: number; overheadPercentage: number; syncTime: number; shareTime: number; } /** * Stream context for maintaining independence */ export interface StreamContext { streamId: string; sharedInsights: Insight[]; reasoning: string[]; } /** * Stream Coordination Manager * * Manages coordination between parallel reasoning streams while * maintaining their independence and monitoring overhead. */ export class StreamCoordinationManager { private readonly importanceThreshold: number = 0.7; private readonly diversityThreshold: number = 0.7; private totalCoordinationTime: number = 0; private lastSyncTime: number = 0; private lastShareTime: number = 0; private lastShareResult: InsightSharingResult | null = null; private streamContexts: Map<string, StreamContext> = new Map(); /** * Check if streams should synchronize at checkpoint * * @param streams - Array of reasoning streams * @param checkpoint - Progress checkpoint (0.25, 0.50, 0.75) * @returns True if all streams have reached checkpoint */ shouldSynchronize(streams: ReasoningStream[], checkpoint: number): boolean { // All streams must have reached or passed the checkpoint return streams.every((stream) => stream.getProgress() >= checkpoint); } /** * Synchronize streams at checkpoint * * @param streams - Array of reasoning streams * @param checkpoint - Progress checkpoint * @returns Synchronization result */ async synchronizeAtCheckpoint( streams: ReasoningStream[], checkpoint: number ): Promise<SynchronizationResult> { const startTime = Date.now(); // Count streams at checkpoint const streamsAtCheckpoint = streams.filter((s) => s.getProgress() >= checkpoint).length; // Perform synchronization (minimal overhead) await this.performSync(streams, checkpoint); // Ensure at least 1ms is recorded for timing measurement const syncTime = Math.max(1, Date.now() - startTime); this.lastSyncTime = syncTime; this.totalCoordinationTime += syncTime; return { synchronized: true, checkpoint, streamsAtCheckpoint, syncTime, }; } /** * Perform synchronization logic * * @param streams - Array of reasoning streams * @param checkpoint - Progress checkpoint */ private async performSync(_streams: ReasoningStream[], _checkpoint: number): Promise<void> { // Minimal synchronization overhead // In real implementation, this would coordinate stream states // Ensure minimum 1ms for timing measurement await new Promise((resolve) => setTimeout(resolve, Math.max(1, Math.random() * 2))); } /** * Filter high-importance insights from streams * * @param streams - Array of reasoning streams * @returns Array of high-importance insights */ filterHighImportanceInsights(streams: ReasoningStream[]): Insight[] { const allInsights: Insight[] = []; // Collect insights from all streams for (const stream of streams) { if ("getInsights" in stream && typeof stream.getInsights === "function") { const insights = (stream as { getInsights: () => Insight[] }).getInsights(); allInsights.push(...insights); } } // Filter by importance threshold return allInsights.filter((insight) => insight.importance > this.importanceThreshold); } /** * Share insights across streams * * @param streams - Array of reasoning streams */ async shareInsights(streams: ReasoningStream[]): Promise<void> { const startTime = Date.now(); // Get high-importance insights const highImportanceInsights = this.filterHighImportanceInsights(streams); // Share with all streams (minimal overhead) for (const stream of streams) { const context = this.getOrCreateContext(stream.id); context.sharedInsights.push(...highImportanceInsights); } // Ensure minimum 1ms for timing measurement await new Promise((resolve) => setTimeout(resolve, 1)); const shareTime = Math.max(1, Date.now() - startTime); this.lastShareTime = shareTime; this.totalCoordinationTime += shareTime; this.lastShareResult = { insightsShared: highImportanceInsights.length, recipientStreams: streams.length, shareTime, }; } /** * Get last share result * * @returns Last insight sharing result */ getLastShareResult(): InsightSharingResult { return ( this.lastShareResult ?? { insightsShared: 0, recipientStreams: 0, shareTime: 0, } ); } /** * Check diversity between stream results * * @param results - Array of stream results * @returns Diversity score (0-1, higher is more diverse) */ checkDiversity(results: StreamResult[]): number { if (results.length < 2) { return 1.0; // Single stream is maximally diverse } // Calculate pairwise conclusion similarity let totalSimilarity = 0; let comparisons = 0; for (let i = 0; i < results.length; i++) { for (let j = i + 1; j < results.length; j++) { const similarity = this.calculateConclusionSimilarity( results[i].conclusion, results[j].conclusion ); totalSimilarity += similarity; comparisons++; } } const avgSimilarity = comparisons > 0 ? totalSimilarity / comparisons : 0; // Diversity is inverse of similarity return 1.0 - avgSimilarity; } /** * Calculate similarity between two conclusions * * @param conclusion1 - First conclusion * @param conclusion2 - Second conclusion * @returns Similarity score (0-1) */ private calculateConclusionSimilarity(conclusion1: string, conclusion2: string): number { // Simple word-based similarity const words1 = new Set(conclusion1.toLowerCase().split(/\s+/)); const words2 = new Set(conclusion2.toLowerCase().split(/\s+/)); if (words1.size === 0 && words2.size === 0) { return 1.0; // Both empty } if (words1.size === 0 || words2.size === 0) { return 0.0; // One empty } // Jaccard similarity const intersection = new Set([...words1].filter((w) => words2.has(w))); const union = new Set([...words1, ...words2]); return intersection.size / union.size; } /** * Check if diversity alert should be raised * * @param diversity - Current diversity score * @returns True if alert should be raised */ shouldAlertLowDiversity(diversity: number): boolean { return diversity < this.diversityThreshold; } /** * Check if convergence is natural (high confidence) * * @param results - Array of stream results * @param diversity - Current diversity score * @returns True if convergence is natural */ isNaturalConvergence(results: StreamResult[], diversity: number): boolean { // Natural convergence: low diversity + high confidence const avgConfidence = results.reduce((sum, r) => sum + r.confidence, 0) / results.length; return diversity < this.diversityThreshold && avgConfidence > 0.9; } /** * Record coordination time * * @param time - Time in milliseconds */ recordCoordinationTime(time: number): void { this.totalCoordinationTime += time; } /** * Measure coordination overhead * * @param totalTime - Total processing time in milliseconds * @returns Overhead as percentage (0-1) */ measureCoordinationOverhead(totalTime: number): number { if (totalTime === 0) { return 0; } return this.totalCoordinationTime / totalTime; } /** * Get last synchronization time * * @returns Time in milliseconds */ getLastSyncTime(): number { return this.lastSyncTime; } /** * Get last insight sharing time * * @returns Time in milliseconds */ getLastShareTime(): number { return this.lastShareTime; } /** * Get overhead metrics * * @returns Overhead metrics */ getOverheadMetrics(): OverheadMetrics { return { totalCoordinationTime: this.totalCoordinationTime, overheadPercentage: 0, // Will be calculated with total time syncTime: this.lastSyncTime, shareTime: this.lastShareTime, }; } /** * Check if streams are blocked * * @param streams - Array of reasoning streams * @param nextCheckpoint - Next checkpoint to reach * @returns True if streams are blocked */ areStreamsBlocked(_streams: ReasoningStream[], _nextCheckpoint: number): boolean { // Streams are not blocked between checkpoints return false; } /** * Check if streams can proceed * * @param streams - Array of reasoning streams * @param checkpoint - Checkpoint to check * @returns True if streams can proceed */ canStreamsProceed(streams: ReasoningStream[], checkpoint: number): boolean { // Streams can proceed if they've passed the checkpoint return streams.every((s) => s.getProgress() >= checkpoint); } /** * Get stream contexts * * @param streams - Array of reasoning streams * @returns Array of stream contexts */ getStreamContexts(streams: ReasoningStream[]): StreamContext[] { return streams.map((stream) => this.getOrCreateContext(stream.id)); } /** * Get or create context for stream * * @param streamId - Stream identifier * @returns Stream context */ private getOrCreateContext(streamId: string): StreamContext { let context = this.streamContexts.get(streamId); if (!context) { context = { streamId, sharedInsights: [], reasoning: [], }; this.streamContexts.set(streamId, context); } return context; } /** * Check if streams are separate (independent) * * @param streams - Array of reasoning streams * @returns True if streams maintain independence */ areStreamsSeparate(_streams: ReasoningStream[]): boolean { // Streams maintain separate contexts return true; } /** * Check if should continue with timeout * * @param results - Array of stream results * @returns True if should continue */ shouldContinueWithTimeout(_results: StreamResult[]): boolean { // Continue with remaining streams return true; } /** * Get active streams from results * * @param results - Array of stream results * @returns Array of active (completed) stream results */ getActiveStreams(results: StreamResult[]): StreamResult[] { return results.filter((r) => r.status === ("completed" as StreamStatus)); } /** * Wait for checkpoint with timeout * * @param streams - Array of reasoning streams * @param checkpoint - Checkpoint to wait for * @param maxWaitTime - Maximum wait time in milliseconds */ async waitForCheckpoint( streams: ReasoningStream[], checkpoint: number, maxWaitTime: number ): Promise<void> { const startTime = Date.now(); while (Date.now() - startTime < maxWaitTime) { if (this.shouldSynchronize(streams, checkpoint)) { return; } await new Promise((resolve) => setTimeout(resolve, 10)); } // Timeout reached } /** * Check if should continue with failure * * @param results - Array of stream results * @returns True if should continue */ shouldContinueWithFailure(_results: StreamResult[]): boolean { // Continue with remaining streams return true; } /** * Get successful streams from results * * @param results - Array of stream results * @returns Array of successful stream results */ getSuccessfulStreams(results: StreamResult[]): StreamResult[] { return results.filter((r) => r.status === ("completed" as StreamStatus)); } /** * Check if failure is isolated * * @param streams - Array of reasoning streams * @returns True if failure is isolated to one stream */ isFailureIsolated(streams: ReasoningStream[]): boolean { // Check if only one stream is cancelled/failed const failedCount = streams.filter((s) => { if ("isCancelled" in s && typeof s.isCancelled === "function") { return (s as { isCancelled: () => boolean }).isCancelled(); } return false; }).length; return failedCount <= 1; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keyurgolani/ThoughtMcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server