Skip to main content
Glama
orchestrator.ts6.51 kB
/** * Parallel Reasoning Orchestrator * * Coordinates the execution of four reasoning streams in parallel, * managing timeouts, resource allocation, and error handling. */ import { StreamCoordinationManager } from "./coordination-manager"; import type { ReasoningStream } from "./stream"; import { ResultSynthesisEngine } from "./synthesis-engine"; import type { Problem, StreamResult, StreamStatus, StreamType, SynthesizedResult } from "./types"; /** * Orchestrator for parallel reasoning streams * * Executes four reasoning streams (analytical, creative, critical, synthetic) * in parallel with proper timeout management and graceful degradation. */ export class ParallelReasoningOrchestrator { private readonly defaultTotalTimeout: number = 30000; // 30s total private readonly coordinationManager: StreamCoordinationManager; private readonly synthesisEngine: ResultSynthesisEngine; /** * Create a new parallel reasoning orchestrator */ constructor() { this.coordinationManager = new StreamCoordinationManager(); this.synthesisEngine = new ResultSynthesisEngine(); } /** * Get the coordination manager * * @returns Stream coordination manager instance */ getCoordinationManager(): StreamCoordinationManager { return this.coordinationManager; } /** * Execute reasoning streams in parallel * * @param problem - Problem to analyze * @param streams - Array of reasoning streams to execute * @param totalTimeout - Total timeout in milliseconds (default: 30000ms) * @returns Promise resolving to synthesized result */ async executeStreams( problem: Problem, streams: ReasoningStream[], totalTimeout: number = this.defaultTotalTimeout ): Promise<SynthesizedResult> { // Execute all streams in parallel with individual timeouts const streamPromises = streams.map((stream) => this.executeStreamWithTimeout(stream, problem, stream.timeout) ); // Share insights between streams during execution (optimization) // This happens asynchronously and doesn't block stream execution this.shareInsightsDuringExecution(streams).catch(() => { // Ignore errors in insight sharing to not block execution }); // Wait for all streams with total timeout const results = await this.executeWithTotalTimeout(streamPromises, totalTimeout); // Synthesize results from all streams const synthesizedResult = this.synthesisEngine.synthesizeResults(results); return synthesizedResult; } /** * Share insights between streams during execution * * @param streams - Array of reasoning streams */ private async shareInsightsDuringExecution(streams: ReasoningStream[]): Promise<void> { // Wait a bit for streams to generate initial insights await new Promise((resolve) => setTimeout(resolve, 50)); // Share high-importance insights await this.coordinationManager.shareInsights(streams); } /** * Execute a single stream with timeout * * @param stream - Reasoning stream to execute * @param problem - Problem to analyze * @param timeout - Timeout in milliseconds * @returns Promise resolving to stream result */ private async executeStreamWithTimeout( stream: ReasoningStream, problem: Problem, timeout: number ): Promise<StreamResult> { try { // Create timeout promise const timeoutPromise = new Promise<StreamResult>((_, reject) => { setTimeout(() => { stream.cancel(); reject(new Error(`Stream ${stream.type} timed out after ${timeout}ms`)); }, timeout); }); // Race between stream execution and timeout const result = await Promise.race([stream.process(problem), timeoutPromise]); return result; } catch (error) { // Handle timeout or execution error const isTimeout = error instanceof Error && error.message.includes("timed out"); return { streamId: stream.id, streamType: stream.type, conclusion: "", reasoning: [], insights: [], confidence: 0, processingTime: timeout, status: isTimeout ? ("timeout" as StreamStatus) : ("failed" as StreamStatus), error: error instanceof Error ? error : new Error(String(error)), }; } } /** * Execute promises with total timeout * * @param promises - Array of stream promises * @param totalTimeout - Total timeout in milliseconds * @returns Promise resolving to array of stream results */ private async executeWithTotalTimeout( promises: Promise<StreamResult>[], totalTimeout: number ): Promise<StreamResult[]> { try { // Create total timeout promise const timeoutPromise = new Promise<StreamResult[]>((_, reject) => { setTimeout(() => { reject(new Error(`Total execution timed out after ${totalTimeout}ms`)); }, totalTimeout); }); // Race between all streams and total timeout const results = await Promise.race([Promise.allSettled(promises), timeoutPromise]); // Extract results from settled promises return (results as PromiseSettledResult<StreamResult>[]).map((result) => { if (result.status === "fulfilled") { return result.value; } else { // Promise was rejected return { streamId: "unknown", streamType: "analytical" as StreamType, conclusion: "", reasoning: [], insights: [], confidence: 0, processingTime: 0, status: "failed" as StreamStatus, error: result.reason instanceof Error ? result.reason : new Error(String(result.reason)), }; } }); } catch { // Total timeout exceeded - return whatever we have const settledResults = await Promise.allSettled(promises); return settledResults.map((result) => { if (result.status === "fulfilled") { return result.value; } else { return { streamId: "unknown", streamType: "analytical" as StreamType, conclusion: "", reasoning: [], insights: [], confidence: 0, processingTime: 0, status: "timeout" as StreamStatus, error: result.reason instanceof Error ? result.reason : new Error(String(result.reason)), }; } }); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keyurgolani/ThoughtMcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server