Skip to main content
Glama
ParallelReasoningProcessor.ts20.7 kB
/** * Parallel Reasoning Processor * * Coordinates multiple reasoning streams to process problems concurrently */ import { AlternativePerspective, ConflictDetection, ConflictResolution, ConsensusBuilding, InformationSharing, IParallelReasoningProcessor, ParallelReasoningResult, ReasoningStream, StreamCoordination, StreamResult, SynchronizationPoint, } from "../interfaces/parallel-reasoning.js"; import { Problem } from "../interfaces/systematic-thinking.js"; import { Context } from "../types/core.js"; import { AnalyticalReasoningStream } from "./streams/AnalyticalReasoningStream.js"; import { CreativeReasoningStream } from "./streams/CreativeReasoningStream.js"; import { CriticalReasoningStream } from "./streams/CriticalReasoningStream.js"; import { SyntheticReasoningStream } from "./streams/SyntheticReasoningStream.js"; export class ParallelReasoningProcessor implements IParallelReasoningProcessor { private streams: Map<string, ReasoningStream> = new Map(); private initialized: boolean = false; async initialize(): Promise<void> { if (this.initialized) { return; } // Initialize all stream types const analyticalStream = new AnalyticalReasoningStream(); const creativeStream = new CreativeReasoningStream(); const criticalStream = new CriticalReasoningStream(); const syntheticStream = new SyntheticReasoningStream(); // Store streams this.streams.set(analyticalStream.id, analyticalStream); this.streams.set(creativeStream.id, creativeStream); this.streams.set(criticalStream.id, criticalStream); this.streams.set(syntheticStream.id, syntheticStream); // Initialize all streams await Promise.all( Array.from(this.streams.values()).map((stream) => stream.initialize()) ); this.initialized = true; } async processParallel( problem: Problem, context?: Context ): Promise<ParallelReasoningResult> { if (!this.initialized) { await this.initialize(); } const startTime = Date.now(); // Step 1: Initialize streams for this problem const activeStreams = await this.initializeStreams(problem); // Step 2: Process problem in parallel across all streams const streamPromises = activeStreams.map((stream) => this.processStreamWithErrorHandling(stream, problem, context) ); const streamResults = await Promise.all(streamPromises); // Step 3: Coordinate streams and handle synchronization const coordination = await this.coordinateStreams(activeStreams, problem); // Step 4: Detect and resolve conflicts const conflicts = this.detectConflicts(streamResults); const conflictResolutions = await this.resolveConflicts(conflicts); // Step 5: Build consensus const consensus = await this.buildConsensus(streamResults); // Step 6: Update coordination with conflict resolutions and consensus const finalCoordination: StreamCoordination = { ...coordination, conflict_resolutions: conflictResolutions, consensus_building: consensus, }; // Step 7: Synthesize final result const processingTime = Date.now() - startTime; const result = this.synthesizeResults( streamResults, finalCoordination, problem, processingTime ); return result; } async initializeStreams(_problem: Problem): Promise<ReasoningStream[]> { const activeStreams: ReasoningStream[] = []; // Always include all stream types for comprehensive analysis for (const stream of this.streams.values()) { // Ensure stream is initialized and active if (stream.getStatus().active) { activeStreams.push(stream); } } // If no streams are active, reinitialize if (activeStreams.length === 0) { await this.initialize(); activeStreams.push(...Array.from(this.streams.values())); } return activeStreams; } async coordinateStreams( streams: ReasoningStream[], _problem: Problem ): Promise<StreamCoordination> { // Create synchronization points const synchronizationPoints = await this.synchronizeStreams(streams); // Initialize information sharing (will be populated during processing) const informationSharing: InformationSharing[] = []; // Create initial coordination structure return { synchronization_points: synchronizationPoints, conflict_resolutions: [], // Will be populated later consensus_building: { participating_streams: streams.map((s) => s.id), consensus_points: [], disagreement_points: [], final_consensus: "", consensus_confidence: 0, }, information_sharing: informationSharing, }; } async synchronizeStreams( streams: ReasoningStream[] ): Promise<SynchronizationPoint[]> { const synchronizationPoints: SynchronizationPoint[] = []; // Create initial synchronization point synchronizationPoints.push({ timestamp: Date.now(), participating_streams: streams.map((s) => s.id), shared_insights: [ "All streams initialized for parallel processing", "Problem analysis distributed across reasoning approaches", ], coordination_type: "information_exchange", }); return synchronizationPoints; } detectConflicts(results: StreamResult[]): ConflictDetection[] { const conflicts: ConflictDetection[] = []; // Check for conclusion conflicts const conclusions = results.flatMap((r) => r.conclusions); const conclusionConflicts = this.findConclusionConflicts( results, conclusions ); conflicts.push(...conclusionConflicts); // Check for confidence conflicts const confidenceConflicts = this.findConfidenceConflicts(results); conflicts.push(...confidenceConflicts); // Check for assumption conflicts const assumptionConflicts = this.findAssumptionConflicts(results); conflicts.push(...assumptionConflicts); return conflicts; } async resolveConflicts( conflicts: ConflictDetection[] ): Promise<ConflictResolution[]> { const resolutions: ConflictResolution[] = []; for (const conflict of conflicts) { const resolution = await this.resolveIndividualConflict(conflict); resolutions.push(resolution); } return resolutions; } async buildConsensus(results: StreamResult[]): Promise<ConsensusBuilding> { const participating_streams = results.map((r) => r.stream_id); // Find consensus points const consensus_points = this.findConsensusPoints(results); // Find disagreement points const disagreement_points = this.findDisagreementPoints(results); // Build final consensus const final_consensus = this.buildFinalConsensus(results, consensus_points); // Calculate consensus confidence const consensus_confidence = this.calculateConsensusConfidence( results, consensus_points ); return { participating_streams, consensus_points, disagreement_points, final_consensus, consensus_confidence, }; } synthesizeResults( results: StreamResult[], coordination: StreamCoordination, originalProblem?: Problem, processingTime?: number ): ParallelReasoningResult { // Use original problem if provided, otherwise extract from results const problem = originalProblem ?? this.extractProblemFromResults(results); // Synthesize conclusion from all streams const synthesized_conclusion = this.synthesizeConclusion( results, coordination ); // Calculate overall confidence const confidence = this.calculateOverallConfidence(results, coordination); // Gather insights from all streams const insights = this.gatherInsights(results); // Generate recommendations const recommendations = this.generateRecommendations(results, coordination); // Create alternative perspectives const alternative_perspectives = this.createAlternativePerspectives(results); return { problem, stream_results: results, coordination, synthesized_conclusion, confidence, processing_time_ms: processingTime ?? 0, insights, recommendations, alternative_perspectives, }; } reset(): void { // Reset all streams for (const stream of this.streams.values()) { stream.reset(); } } private async processStreamWithErrorHandling( stream: ReasoningStream, problem: Problem, context?: Context ): Promise<StreamResult> { try { return await stream.process(problem, context); } catch { // Stream failed, return fallback result return { stream_id: stream.id, stream_type: stream.type, reasoning_steps: [], conclusions: [`Stream ${stream.type} encountered an error`], confidence: 0.1, processing_time_ms: 0, insights: [], evidence: [], assumptions: [], }; } } private findConclusionConflicts( results: StreamResult[], _conclusions: string[] ): ConflictDetection[] { const conflicts: ConflictDetection[] = []; // Simple conflict detection: look for contradictory keywords const contradictoryPairs = [ ["should", "should not"], ["recommend", "not recommend"], ["feasible", "infeasible"], ["possible", "impossible"], ]; for (const [positive, negative] of contradictoryPairs) { const positiveStreams = results.filter((r) => r.conclusions.some((c) => c.toLowerCase().includes(positive)) ); const negativeStreams = results.filter((r) => r.conclusions.some((c) => c.toLowerCase().includes(negative)) ); if (positiveStreams.length > 0 && negativeStreams.length > 0) { conflicts.push({ stream_ids: [ ...positiveStreams.map((s) => s.stream_id), ...negativeStreams.map((s) => s.stream_id), ], conflict_type: "conclusion", description: `Contradictory conclusions regarding ${positive}/${negative}`, severity: 0.7, }); } } return conflicts; } private findConfidenceConflicts( results: StreamResult[] ): ConflictDetection[] { const conflicts: ConflictDetection[] = []; // Check for significant confidence variations const confidences = results.map((r) => r.confidence); const maxConfidence = Math.max(...confidences); const minConfidence = Math.min(...confidences); if (maxConfidence - minConfidence > 0.5) { const highConfidenceStreams = results.filter((r) => r.confidence > 0.7); const lowConfidenceStreams = results.filter((r) => r.confidence < 0.4); if (highConfidenceStreams.length > 0 && lowConfidenceStreams.length > 0) { conflicts.push({ stream_ids: [ ...highConfidenceStreams.map((s) => s.stream_id), ...lowConfidenceStreams.map((s) => s.stream_id), ], conflict_type: "conclusion", description: "Significant confidence variation between streams", severity: 0.5, }); } } return conflicts; } private findAssumptionConflicts( results: StreamResult[] ): ConflictDetection[] { const conflicts: ConflictDetection[] = []; // Check for conflicting assumptions const allAssumptions = results.flatMap((r) => r.assumptions); const assumptionCounts = allAssumptions.reduce((counts, assumption) => { counts[assumption] = (counts[assumption] ?? 0) + 1; return counts; }, {} as Record<string, number>); // Look for assumptions that appear in some streams but are contradicted in others const contradictoryAssumptions = Object.keys(assumptionCounts).filter( (assumption) => { const negations = [`not ${assumption}`, `${assumption} is false`]; return allAssumptions.some((a) => negations.some((neg) => a.includes(neg)) ); } ); if (contradictoryAssumptions.length > 0) { conflicts.push({ stream_ids: results.map((r) => r.stream_id), conflict_type: "assumption", description: `Contradictory assumptions: ${contradictoryAssumptions .slice(0, 2) .join(", ")}`, severity: 0.6, }); } return conflicts; } private async resolveIndividualConflict( conflict: ConflictDetection ): Promise<ConflictResolution> { let resolution_strategy: string; let resolved_conclusion: string; let confidence: number; switch (conflict.conflict_type) { case "conclusion": resolution_strategy = "Weighted consensus based on stream confidence"; resolved_conclusion = "Balanced approach considering all perspectives"; confidence = 0.6; break; case "evidence": resolution_strategy = "Evidence quality assessment and validation"; resolved_conclusion = "Prioritize higher quality evidence sources"; confidence = 0.7; break; case "assumption": resolution_strategy = "Assumption validation and risk assessment"; resolved_conclusion = "Identify critical assumptions requiring validation"; confidence = 0.5; break; default: resolution_strategy = "General conflict mediation"; resolved_conclusion = "Seek common ground and compromise"; confidence = 0.4; } return { conflicting_streams: conflict.stream_ids, conflict_description: conflict.description, resolution_strategy, resolved_conclusion, confidence, }; } private findConsensusPoints(results: StreamResult[]): string[] { const consensus_points: string[] = []; // Find common insights const allInsights = results.flatMap((r) => r.insights); const insightCounts = allInsights.reduce((counts, insight) => { counts[insight] = (counts[insight] ?? 0) + 1; return counts; }, {} as Record<string, number>); // Insights that appear in multiple streams const commonInsights = Object.entries(insightCounts) .filter(([_, count]) => count > 1) .map(([insight, _]) => insight); consensus_points.push(...commonInsights); // Find common conclusion themes const allConclusions = results.flatMap((r) => r.conclusions); if ( allConclusions.some((c) => c.includes("systematic")) && allConclusions.some((c) => c.includes("approach")) ) { consensus_points.push("Systematic approach is beneficial"); } if ( allConclusions.some((c) => c.includes("multiple")) && allConclusions.some((c) => c.includes("perspective")) ) { consensus_points.push("Multiple perspectives add value"); } return consensus_points; } private findDisagreementPoints(results: StreamResult[]): string[] { const disagreement_points: string[] = []; // Find areas where streams disagree const streamTypes = results.map((r) => r.stream_type); if ( streamTypes.includes("analytical") && streamTypes.includes("creative") ) { const analyticalConclusions = results.find((r) => r.stream_type === "analytical")?.conclusions ?? []; const creativeConclusions = results.find((r) => r.stream_type === "creative")?.conclusions ?? []; if ( analyticalConclusions.some((c) => c.includes("systematic")) && creativeConclusions.some((c) => c.includes("innovative")) ) { disagreement_points.push( "Balance between systematic vs. innovative approaches" ); } } if (streamTypes.includes("critical") && streamTypes.includes("creative")) { disagreement_points.push("Risk tolerance vs. innovation appetite"); } return disagreement_points; } private buildFinalConsensus( results: StreamResult[], consensus_points: string[] ): string { const streamCount = results.length; const consensusCount = consensus_points.length; if (consensusCount === 0) { return `${streamCount} reasoning streams provide diverse perspectives requiring integration`; } return `${streamCount} reasoning streams achieve consensus on ${consensusCount} key points: ${consensus_points .slice(0, 3) .join( ", " )}. Integration of diverse perspectives provides comprehensive solution approach.`; } private calculateConsensusConfidence( results: StreamResult[], consensus_points: string[] ): number { const avgConfidence = results.reduce((sum, r) => sum + r.confidence, 0) / results.length; const consensusBonus = Math.min(0.2, consensus_points.length * 0.05); return Math.min(1.0, avgConfidence + consensusBonus); } private extractProblemFromResults(_results: StreamResult[]): Problem { // Since all streams process the same problem, we need to reconstruct it // This is a simplified version - in practice, we'd store the original problem return { description: "Parallel reasoning problem analysis", domain: "general", complexity: 0.7, uncertainty: 0.5, constraints: [], stakeholders: [], time_sensitivity: 0.5, resource_requirements: [], }; } private synthesizeConclusion( results: StreamResult[], coordination: StreamCoordination ): string { const allConclusions = results.flatMap((r) => r.conclusions); const streamTypes = results.map((r) => r.stream_type); const consensusPoints = coordination.consensus_building.consensus_points; const conflictCount = coordination.conflict_resolutions.length; return `Parallel reasoning across ${streamTypes.join( ", " )} streams yields ${allConclusions.length} conclusions with ${ consensusPoints.length } consensus points and ${conflictCount} resolved conflicts. Integrated analysis provides comprehensive understanding with balanced perspectives.`; } private calculateOverallConfidence( results: StreamResult[], coordination: StreamCoordination ): number { // Base confidence from stream average const avgStreamConfidence = results.reduce((sum, r) => sum + r.confidence, 0) / results.length; // Consensus bonus const consensusBonus = coordination.consensus_building.consensus_confidence * 0.2; // Conflict penalty const conflictPenalty = coordination.conflict_resolutions.length * 0.05; const confidence = avgStreamConfidence + consensusBonus - conflictPenalty; return Math.max(0.1, Math.min(1.0, confidence)); } private gatherInsights(results: StreamResult[]): string[] { const allInsights = results.flatMap((r) => r.insights); // Remove duplicates and add synthesis insights const uniqueInsights = [...new Set(allInsights)]; // Add parallel processing insights uniqueInsights.push( "Parallel reasoning reveals multiple valid perspectives" ); uniqueInsights.push("Stream coordination enables comprehensive analysis"); return uniqueInsights; } private generateRecommendations( results: StreamResult[], coordination: StreamCoordination ): string[] { const recommendations: string[] = []; // Based on consensus if (coordination.consensus_building.consensus_points.length > 2) { recommendations.push( "Leverage areas of consensus for implementation foundation" ); } // Based on conflicts if (coordination.conflict_resolutions.length > 0) { recommendations.push( "Address identified conflicts through stakeholder engagement" ); } // Based on stream diversity const streamTypes = results.map((r) => r.stream_type); if (streamTypes.length > 3) { recommendations.push( "Maintain diverse perspective integration throughout implementation" ); } // Based on confidence levels const avgConfidence = results.reduce((sum, r) => sum + r.confidence, 0) / results.length; if (avgConfidence > 0.7) { recommendations.push("High confidence enables decisive action"); } else { recommendations.push( "Moderate confidence suggests iterative validation approach" ); } return recommendations; } private createAlternativePerspectives( results: StreamResult[] ): AlternativePerspective[] { return results.map((result) => ({ stream_type: result.stream_type, perspective: result.conclusions.join("; "), supporting_evidence: result.evidence, confidence: result.confidence, })); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keyurgolani/ThoughtMcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server