/**
* Reasoning Routes
*
* REST API endpoints for thinking and reasoning operations.
* Requirements: 3.1, 3.2, 3.3, 4.1, 4.4
*/
import { Router, type Request, type Response } from "express";
import { z } from "zod";
import type { ConfidenceAssessment } from "../../confidence/types.js";
import type { ReasoningStream } from "../../reasoning/stream.types.js";
import { AnalyticalReasoningStream } from "../../reasoning/streams/analytical-stream.js";
import { CreativeReasoningStream } from "../../reasoning/streams/creative-stream.js";
import { CriticalReasoningStream } from "../../reasoning/streams/critical-stream.js";
import { SyntheticReasoningStream } from "../../reasoning/streams/synthetic-stream.js";
import type {
AttributedInsight,
Conflict,
Problem,
ReasoningContext,
StreamType,
SynthesizedResult,
} from "../../reasoning/types.js";
import type { CognitiveCore } from "../cognitive-core.js";
import { asyncHandler, NotFoundError, ValidationApiError } from "../middleware/error-handler.js";
import { buildSuccessResponse } from "../types/api-response.js";
/**
* Think session status
* Requirements: 3.2
*/
export type ThinkSessionStatus = "processing" | "complete" | "error";
/**
* Think session data structure
* Requirements: 3.2
*/
export interface ThinkSession {
sessionId: string;
status: ThinkSessionStatus;
progress: number;
currentStage: string;
activeStreams: string[];
mode: string;
startedAt: Date;
completedAt?: Date;
error?: string;
}
/**
* In-memory store for think sessions
* Requirements: 3.2
*/
export class ThinkSessionStore {
private sessions: Map<string, ThinkSession> = new Map();
/**
* Create a new think session
*/
createSession(mode: string, activeStreams: string[]): ThinkSession {
const sessionId = `think-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`;
const session: ThinkSession = {
sessionId,
status: "processing",
progress: 0,
currentStage: "initializing",
activeStreams,
mode,
startedAt: new Date(),
};
this.sessions.set(sessionId, session);
return session;
}
/**
* Get a session by ID
*/
getSession(sessionId: string): ThinkSession | undefined {
return this.sessions.get(sessionId);
}
/**
* Update a session
*/
updateSession(sessionId: string, updates: Partial<ThinkSession>): void {
const session = this.sessions.get(sessionId);
if (session) {
Object.assign(session, updates);
if (updates.status === "complete" || updates.status === "error") {
session.completedAt = new Date();
}
}
}
/**
* Delete a session
*/
deleteSession(sessionId: string): void {
this.sessions.delete(sessionId);
}
/**
* Cleanup old sessions
*/
cleanupOldSessions(maxAgeMs: number): void {
const now = Date.now();
for (const [sessionId, session] of this.sessions) {
if (now - session.startedAt.getTime() > maxAgeMs) {
this.sessions.delete(sessionId);
}
}
}
}
// Global session store instance
const thinkSessionStore = new ThinkSessionStore();
/**
* Valid thinking modes for the think endpoint
* Requirements: 3.1
*/
const VALID_MODES = ["intuitive", "deliberative", "balanced", "creative", "analytical"] as const;
type ThinkMode = (typeof VALID_MODES)[number];
/**
* Zod schema for think request validation
* Requirements: 3.1, 3.3
*/
const thinkRequestSchema = z.object({
input: z
.string()
.min(1, "input is required")
.max(10000, "input must be at most 10,000 characters"),
mode: z.enum(VALID_MODES, {
errorMap: () => ({
message: `mode must be one of: ${VALID_MODES.join(", ")}`,
}),
}),
context: z.string().max(5000, "context must be at most 5,000 characters").optional(),
userId: z.string().min(1, "userId must be non-empty if provided").optional(),
});
/**
* Metacognitive assessment in think response
* Requirements: 3.1
*/
interface MetacognitiveAssessmentResponse {
overallConfidence: number;
evidenceQuality: number;
reasoningCoherence: number;
completeness: number;
uncertaintyLevel: number;
uncertaintyType: string;
factors: Array<{
dimension: string;
score: number;
weight: number;
explanation: string;
}>;
}
/**
* Thought item in think response
* Requirements: 3.1
*/
interface ThoughtItem {
content: string;
sources: string[];
confidence: number;
importance: number;
}
/**
* Response type for think endpoint
* Requirements: 3.1
*/
interface ThinkResponse {
thoughts: ThoughtItem[];
confidence: number;
modeUsed: ThinkMode;
processingTimeMs: number;
metacognitiveAssessment: MetacognitiveAssessmentResponse;
conclusion: string;
recommendations: Array<{
description: string;
priority: number;
confidence: number;
}>;
}
/**
* Helper to extract request ID from request
*/
function getRequestId(req: Request): string | undefined {
return (req as Request & { requestId?: string }).requestId;
}
/**
* Helper to parse Zod validation errors into field errors
*/
function parseZodErrors(error: z.ZodError): Record<string, string> {
const fieldErrors: Record<string, string> = {};
for (const issue of error.issues) {
const path = issue.path.join(".") || "request";
fieldErrors[path] = issue.message;
}
return fieldErrors;
}
/**
* Map think mode to stream types
* Requirements: 3.1
*/
function getStreamsForMode(mode: ThinkMode): StreamType[] {
switch (mode) {
case "intuitive":
// Fast, pattern-based thinking - creative and synthetic
return ["creative", "synthetic"] as StreamType[];
case "deliberative":
// Slow, analytical thinking - analytical and critical
return ["analytical", "critical"] as StreamType[];
case "balanced":
// All streams for comprehensive analysis
return ["analytical", "creative", "critical", "synthetic"] as StreamType[];
case "creative":
// Focus on creative and synthetic streams
return ["creative", "synthetic"] as StreamType[];
case "analytical":
// Focus on analytical and critical streams
return ["analytical", "critical"] as StreamType[];
}
}
/**
* Create reasoning streams based on selected types
*/
function createStreams(streamTypes: StreamType[]): ReasoningStream[] {
const streams: ReasoningStream[] = [];
for (const type of streamTypes) {
switch (type) {
case "analytical":
streams.push(new AnalyticalReasoningStream());
break;
case "creative":
streams.push(new CreativeReasoningStream());
break;
case "critical":
streams.push(new CriticalReasoningStream());
break;
case "synthetic":
streams.push(new SyntheticReasoningStream());
break;
}
}
return streams;
}
/**
* Convert synthesized result to think response format
*/
function convertToThinkResponse(
result: SynthesizedResult,
mode: ThinkMode,
processingTimeMs: number,
metacognitiveAssessment: ConfidenceAssessment
): ThinkResponse {
// Convert insights to thoughts
const thoughts: ThoughtItem[] = result.insights.map((insight: AttributedInsight) => ({
content: insight.content,
sources: insight.sources,
confidence: insight.confidence,
importance: insight.importance,
}));
// Convert metacognitive assessment
const metacognitiveResponse: MetacognitiveAssessmentResponse = {
overallConfidence: metacognitiveAssessment.overallConfidence,
evidenceQuality: metacognitiveAssessment.evidenceQuality,
reasoningCoherence: metacognitiveAssessment.reasoningCoherence,
completeness: metacognitiveAssessment.completeness,
uncertaintyLevel: metacognitiveAssessment.uncertaintyLevel,
uncertaintyType: metacognitiveAssessment.uncertaintyType,
factors: metacognitiveAssessment.factors.map((factor) => ({
dimension: factor.dimension,
score: factor.score,
weight: factor.weight,
explanation: factor.explanation,
})),
};
// Convert recommendations
const recommendations = result.recommendations.map((rec) => ({
description: rec.description,
priority: rec.priority,
confidence: rec.confidence,
}));
return {
thoughts,
confidence: result.confidence,
modeUsed: mode,
processingTimeMs,
metacognitiveAssessment: metacognitiveResponse,
conclusion: result.conclusion,
recommendations,
};
}
/**
* Handler for POST /api/v1/think
* Requirements: 3.1, 3.3
*
* Initiates reasoning with the specified mode and returns thoughts,
* confidence, mode used, processing time, and metacognitive assessment.
*/
function createThinkHandler(
cognitiveCore: CognitiveCore
): (req: Request, res: Response, next: import("express").NextFunction) => void {
return asyncHandler(async (req: Request, res: Response): Promise<void> => {
const startTime = Date.now();
const requestId = getRequestId(req);
// Validate request body
const parseResult = thinkRequestSchema.safeParse(req.body);
if (!parseResult.success) {
throw new ValidationApiError(parseZodErrors(parseResult.error));
}
const { input, mode, context, userId } = parseResult.data;
// Build problem description with optional context
let problemDescription = input;
let problemContext = "";
// If userId provided, augment with memory context
if (userId && context) {
// Use provided context directly
problemContext = context;
} else if (userId) {
// Retrieve memory context for the user
const augmentedContext = await cognitiveCore.memoryAugmentedReasoning.augmentProblemContext(
input,
userId
);
if (augmentedContext.hasMemoryContext) {
problemDescription = augmentedContext.augmentedProblem;
problemContext = augmentedContext.memoryBackground;
}
} else if (context) {
// Use provided context without memory augmentation
problemContext = context;
}
// Create problem object
const problem: Problem = {
id: `think-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`,
description: problemDescription,
context: problemContext,
complexity:
mode === "deliberative" ? "complex" : mode === "intuitive" ? "simple" : "moderate",
};
// Get streams for the selected mode
const streamTypes = getStreamsForMode(mode);
const streams = createStreams(streamTypes);
// Execute parallel reasoning
const synthesizedResult = await cognitiveCore.reasoningOrchestrator.executeStreams(
problem,
streams,
mode === "intuitive" ? 10000 : 30000 // Shorter timeout for intuitive mode
);
// Build reasoning context for metacognitive assessment
const reasoningContext: ReasoningContext = {
problem,
evidence: synthesizedResult.insights.map((i) => i.content),
constraints: problem.constraints ?? [],
goals: problem.goals ?? [],
framework: mode,
};
// Perform metacognitive assessment
const metacognitiveAssessment =
await cognitiveCore.confidenceAssessor.assessConfidence(reasoningContext);
const processingTimeMs = Date.now() - startTime;
// Convert to response format
const responseData = convertToThinkResponse(
synthesizedResult,
mode,
processingTimeMs,
metacognitiveAssessment
);
res.status(200).json(buildSuccessResponse(responseData, { requestId, startTime }));
});
}
/**
* Response type for think status endpoint
* Requirements: 3.2
*/
interface ThinkStatusResponse {
status: ThinkSessionStatus;
progress: number;
currentStage: string;
activeStreams?: string[];
}
/**
* Handler for GET /api/v1/think/status/:sessionId
* Requirements: 3.2
*
* Returns current processing status, progress percentage, current stage,
* and active streams for a think session.
*/
function createThinkStatusHandler(): (
req: Request,
res: Response,
next: import("express").NextFunction
) => void {
return asyncHandler(async (req: Request, res: Response): Promise<void> => {
const requestId = getRequestId(req);
const startTime = Date.now();
const { sessionId } = req.params;
// Validate sessionId parameter
if (!sessionId || typeof sessionId !== "string" || sessionId.trim() === "") {
throw new ValidationApiError({ sessionId: "sessionId is required" });
}
// Get session from store
const session = thinkSessionStore.getSession(sessionId);
if (!session) {
throw new NotFoundError("ThinkSession", sessionId);
}
// Build response
const responseData: ThinkStatusResponse = {
status: session.status,
progress: session.progress,
currentStage: session.currentStage,
};
// Include active streams only if processing
if (session.status === "processing") {
responseData.activeStreams = session.activeStreams;
}
res.status(200).json(buildSuccessResponse(responseData, { requestId, startTime }));
});
}
/**
* Decision point in reasoning chain
* Requirements: 4.3
*/
export interface DecisionPoint {
/** Decision point identifier */
id: string;
/** Description of the decision */
description: string;
/** Options that were considered */
options: string[];
/** The option that was selected */
selectedOption: string;
/** Rationale for the selection */
rationale: string;
/** Confidence at this decision point (0-1) */
confidence: number;
/** Timestamp of the decision */
timestamp: string;
}
/**
* Reasoning step in chain response
* Requirements: 4.3
*/
export interface ReasoningStepResponse {
/** Step identifier */
id: string;
/** Step content */
content: string;
/** Type of reasoning step */
type: "hypothesis" | "evidence" | "inference" | "conclusion" | "assumption";
/** Confidence in this step (0-1) */
confidence: number;
/** Supporting evidence for this step */
evidence: string[];
/** Timestamp of step */
timestamp: string;
}
/**
* Reasoning branch in chain response
* Requirements: 4.3
*/
export interface ReasoningBranchResponse {
/** Branch identifier */
id: string;
/** Branch description */
description: string;
/** Steps in this branch */
steps: ReasoningStepResponse[];
/** Whether this branch was selected */
selected: boolean;
/** Reason for selection/rejection */
rationale: string;
}
/**
* Response type for reasoning chain endpoint
* Requirements: 4.3
*/
export interface ReasoningChainResponse {
/** Chain identifier (same as session ID) */
chainId: string;
/** Sequential reasoning steps */
steps: ReasoningStepResponse[];
/** Alternative reasoning branches */
branches: ReasoningBranchResponse[];
/** Confidence evolution over time (array of confidence values) */
confidenceEvolution: number[];
/** Decision points in the reasoning process */
decisionPoints: DecisionPoint[];
}
/**
* Valid stream types for parallel reasoning
* Requirements: 4.1
*/
const VALID_STREAM_TYPES = ["analytical", "creative", "critical", "synthetic"] as const;
/**
* Zod schema for parallel reasoning request validation
* Requirements: 4.1
*/
const parallelReasoningRequestSchema = z.object({
problem: z
.string()
.min(1, "problem is required")
.max(10000, "problem must be at most 10,000 characters"),
streams: z
.array(
z.enum(VALID_STREAM_TYPES, {
errorMap: () => ({
message: `stream type must be one of: ${VALID_STREAM_TYPES.join(", ")}`,
}),
})
)
.min(1, "at least one stream is required")
.max(4, "at most 4 streams allowed"),
coordinationStrategy: z
.string()
.max(100, "coordinationStrategy must be at most 100 characters")
.optional(),
userId: z.string().min(1, "userId must be non-empty if provided").optional(),
context: z.string().max(5000, "context must be at most 5,000 characters").optional(),
timeout: z.number().min(1000).max(60000).optional(),
});
/**
* Stream output in parallel reasoning response
* Requirements: 4.1
*/
interface StreamOutputResponse {
streamId: string;
streamType: string;
conclusion: string;
reasoning: string[];
insights: Array<{
content: string;
confidence: number;
importance: number;
}>;
confidence: number;
processingTime: number;
status: string;
}
/**
* Conflict resolution in parallel reasoning response
* Requirements: 4.1
*/
interface ConflictResolutionResponse {
id: string;
type: string;
severity: string;
description: string;
sourceStreams: string[];
resolution?: {
approach: string;
recommendedAction: string;
};
}
/**
* Coordination metrics in parallel reasoning response
* Requirements: 4.4
*/
interface CoordinationMetricsResponse {
sync25: number;
sync50: number;
sync75: number;
totalCoordinationTime: number;
overheadPercentage: number;
}
/**
* Synthesis in parallel reasoning response
* Requirements: 4.1
*/
interface SynthesisResponse {
conclusion: string;
insights: Array<{
content: string;
sources: string[];
confidence: number;
importance: number;
}>;
recommendations: Array<{
description: string;
priority: number;
confidence: number;
}>;
confidence: number;
quality: {
overallScore: number;
coherence: number;
completeness: number;
consistency: number;
};
}
/**
* Response type for parallel reasoning endpoint
* Requirements: 4.1, 4.4
*/
interface ParallelReasoningResponse {
sessionId: string;
streams: StreamOutputResponse[];
synthesis: SynthesisResponse;
conflictsResolved: ConflictResolutionResponse[];
coordinationMetrics: CoordinationMetricsResponse;
}
/**
* Parallel reasoning session data structure
* Requirements: 4.1
*/
export interface ParallelReasoningSession {
sessionId: string;
status: "processing" | "complete" | "error";
progress: number;
currentStage: string;
activeStreams: string[];
startedAt: Date;
completedAt?: Date;
result?: ParallelReasoningResponse;
error?: string;
syncCheckpoints: {
sync25: number;
sync50: number;
sync75: number;
};
}
/**
* In-memory store for parallel reasoning sessions
* Requirements: 4.1
*/
export class ParallelReasoningSessionStore {
private sessions: Map<string, ParallelReasoningSession> = new Map();
/**
* Create a new parallel reasoning session
*/
createSession(activeStreams: string[]): ParallelReasoningSession {
const sessionId = `parallel-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`;
const session: ParallelReasoningSession = {
sessionId,
status: "processing",
progress: 0,
currentStage: "initializing",
activeStreams,
startedAt: new Date(),
syncCheckpoints: {
sync25: 0,
sync50: 0,
sync75: 0,
},
};
this.sessions.set(sessionId, session);
return session;
}
/**
* Get a session by ID
*/
getSession(sessionId: string): ParallelReasoningSession | undefined {
return this.sessions.get(sessionId);
}
/**
* Update a session
*/
updateSession(sessionId: string, updates: Partial<ParallelReasoningSession>): void {
const session = this.sessions.get(sessionId);
if (session) {
Object.assign(session, updates);
if (updates.status === "complete" || updates.status === "error") {
session.completedAt = new Date();
}
}
}
/**
* Delete a session
*/
deleteSession(sessionId: string): void {
this.sessions.delete(sessionId);
}
/**
* Cleanup old sessions
*/
cleanupOldSessions(maxAgeMs: number): void {
const now = Date.now();
for (const [sessionId, session] of this.sessions) {
if (now - session.startedAt.getTime() > maxAgeMs) {
this.sessions.delete(sessionId);
}
}
}
}
// Global parallel reasoning session store instance
const parallelReasoningSessionStore = new ParallelReasoningSessionStore();
/**
* SSE Event types for parallel reasoning streams
* Requirements: 4.2
*/
export type SSEEventType =
| "stream_started"
| "stream_progress"
| "stream_insight"
| "stream_completed"
| "sync_checkpoint"
| "synthesis_started"
| "synthesis_completed"
| "session_completed"
| "session_error"
| "heartbeat";
/**
* SSE Event data structure
* Requirements: 4.2
*/
export interface SSEEvent {
type: SSEEventType;
timestamp: string;
data: {
sessionId: string;
streamType?: string;
progress?: number;
checkpoint?: number;
insight?: {
content: string;
confidence: number;
importance: number;
};
message?: string;
error?: string;
};
}
/**
* SSE client connection for a session
* Requirements: 4.2
*/
interface SSEClient {
res: Response;
sessionId: string;
connectedAt: Date;
}
/**
* SSE Connection Manager for parallel reasoning sessions
* Requirements: 4.2
*
* Manages Server-Sent Events connections for real-time streaming
* of parallel reasoning updates to connected clients.
*/
export class SSEConnectionManager {
private clients: Map<string, Set<SSEClient>> = new Map();
private heartbeatIntervals: Map<string, NodeJS.Timeout> = new Map();
private readonly heartbeatIntervalMs: number = 15000; // 15 seconds
/**
* Add a client connection for a session
*/
addClient(sessionId: string, res: Response): SSEClient {
const client: SSEClient = {
res,
sessionId,
connectedAt: new Date(),
};
if (!this.clients.has(sessionId)) {
this.clients.set(sessionId, new Set());
this.startHeartbeat(sessionId);
}
const sessionClients = this.clients.get(sessionId);
if (sessionClients) {
sessionClients.add(client);
}
return client;
}
/**
* Remove a client connection
*/
removeClient(sessionId: string, client: SSEClient): void {
const sessionClients = this.clients.get(sessionId);
if (sessionClients) {
sessionClients.delete(client);
if (sessionClients.size === 0) {
this.clients.delete(sessionId);
this.stopHeartbeat(sessionId);
}
}
}
/**
* Get all clients for a session
*/
getClients(sessionId: string): Set<SSEClient> | undefined {
return this.clients.get(sessionId);
}
/**
* Check if session has connected clients
*/
hasClients(sessionId: string): boolean {
const clients = this.clients.get(sessionId);
return clients !== undefined && clients.size > 0;
}
/**
* Get client count for a session
*/
getClientCount(sessionId: string): number {
return this.clients.get(sessionId)?.size ?? 0;
}
/**
* Broadcast an event to all clients of a session
*/
broadcast(sessionId: string, event: SSEEvent): void {
const sessionClients = this.clients.get(sessionId);
if (!sessionClients) return;
const eventData = `data: ${JSON.stringify(event)}\n\n`;
for (const client of sessionClients) {
try {
if (!client.res.writableEnded) {
client.res.write(eventData);
}
} catch {
// Client disconnected, will be cleaned up
this.removeClient(sessionId, client);
}
}
}
/**
* Start heartbeat for a session to keep connections alive
*/
private startHeartbeat(sessionId: string): void {
const interval = setInterval(() => {
const event: SSEEvent = {
type: "heartbeat",
timestamp: new Date().toISOString(),
data: {
sessionId,
message: "keepalive",
},
};
this.broadcast(sessionId, event);
}, this.heartbeatIntervalMs);
this.heartbeatIntervals.set(sessionId, interval);
}
/**
* Stop heartbeat for a session
*/
private stopHeartbeat(sessionId: string): void {
const interval = this.heartbeatIntervals.get(sessionId);
if (interval) {
clearInterval(interval);
this.heartbeatIntervals.delete(sessionId);
}
}
/**
* Cleanup all connections for a session
*/
cleanupSession(sessionId: string): void {
const sessionClients = this.clients.get(sessionId);
if (sessionClients) {
for (const client of sessionClients) {
try {
if (!client.res.writableEnded) {
client.res.end();
}
} catch {
// Ignore errors during cleanup
}
}
this.clients.delete(sessionId);
}
this.stopHeartbeat(sessionId);
}
/**
* Cleanup all connections
*/
cleanupAll(): void {
for (const sessionId of this.clients.keys()) {
this.cleanupSession(sessionId);
}
}
}
// Global SSE connection manager instance
const sseConnectionManager = new SSEConnectionManager();
/**
* Convert conflict to response format
*/
function convertConflictToResponse(conflict: Conflict): ConflictResolutionResponse {
const response: ConflictResolutionResponse = {
id: conflict.id,
type: conflict.type,
severity: conflict.severity,
description: conflict.description,
sourceStreams: conflict.sourceStreams,
};
if (conflict.resolutionFramework) {
response.resolution = {
approach: conflict.resolutionFramework.approach,
recommendedAction: conflict.resolutionFramework.recommendedAction,
};
}
return response;
}
/**
* Convert synthesized result to synthesis response format
*/
function convertToSynthesisResponse(result: SynthesizedResult): SynthesisResponse {
return {
conclusion: result.conclusion,
insights: result.insights.map((insight) => ({
content: insight.content,
sources: insight.sources,
confidence: insight.confidence,
importance: insight.importance,
})),
recommendations: result.recommendations.map((rec) => ({
description: rec.description,
priority: rec.priority,
confidence: rec.confidence,
})),
confidence: result.confidence,
quality: {
overallScore: result.quality.overallScore,
coherence: result.quality.coherence,
completeness: result.quality.completeness,
consistency: result.quality.consistency,
},
};
}
/**
* Build problem context from user input or memory augmentation
* Requirements: 4.1
*/
async function buildProblemContext(
cognitiveCore: CognitiveCore,
problem: string,
userId?: string,
context?: string
): Promise<{ description: string; context: string }> {
if (userId && context) {
return { description: problem, context };
}
if (userId) {
const augmented = await cognitiveCore.memoryAugmentedReasoning.augmentProblemContext(
problem,
userId
);
if (augmented.hasMemoryContext) {
return { description: augmented.augmentedProblem, context: augmented.memoryBackground };
}
}
return { description: problem, context: context ?? "" };
}
/**
* Build stream outputs from synthesized result
* Requirements: 4.1
*/
function buildStreamOutputs(
sessionId: string,
synthesizedResult: SynthesizedResult
): StreamOutputResponse[] {
return synthesizedResult.metadata.streamsUsed.map((streamType, index) => ({
streamId: `${sessionId}-${streamType}-${index}`,
streamType,
conclusion: synthesizedResult.conclusion,
reasoning: [],
insights: synthesizedResult.insights
.filter((i) => i.sources.includes(streamType))
.map((i) => ({ content: i.content, confidence: i.confidence, importance: i.importance })),
confidence: synthesizedResult.confidence,
processingTime: synthesizedResult.metadata.synthesisTime,
status: "completed",
}));
}
/**
* Handler for POST /api/v1/reasoning/parallel
* Requirements: 4.1, 4.4
*
* Initiates parallel reasoning with selected streams and returns
* session ID, stream outputs, synthesis, conflict resolutions,
* and coordination metrics.
*/
function createParallelReasoningHandler(
cognitiveCore: CognitiveCore
): (req: Request, res: Response, next: import("express").NextFunction) => void {
return asyncHandler(async (req: Request, res: Response): Promise<void> => {
const startTime = Date.now();
const requestId = getRequestId(req);
// Validate request body
const parseResult = parallelReasoningRequestSchema.safeParse(req.body);
if (!parseResult.success) {
throw new ValidationApiError(parseZodErrors(parseResult.error));
}
const { problem, streams: requestedStreams, userId, context, timeout } = parseResult.data;
const session = parallelReasoningSessionStore.createSession(requestedStreams);
// Build problem context
const problemCtx = await buildProblemContext(cognitiveCore, problem, userId, context);
const problemObj: Problem = {
id: session.sessionId,
description: problemCtx.description,
context: problemCtx.context,
complexity: requestedStreams.length >= 3 ? "complex" : "moderate",
};
const streams = createStreams(requestedStreams as StreamType[]);
parallelReasoningSessionStore.updateSession(session.sessionId, {
currentStage: "executing_streams",
progress: 0.1,
});
const coordinationManager = cognitiveCore.reasoningOrchestrator.getCoordinationManager();
const totalTimeout = timeout ?? 30000;
const syncCheckpoints = { sync25: 0, sync50: 0, sync75: 0 };
// Track checkpoints concurrently with stream execution
const trackCheckpoint = async (
checkpoint: number,
key: keyof typeof syncCheckpoints
): Promise<void> => {
const checkpointStart = Date.now();
await coordinationManager.waitForCheckpoint(streams, checkpoint, totalTimeout * checkpoint);
syncCheckpoints[key] = Date.now() - checkpointStart;
parallelReasoningSessionStore.updateSession(session.sessionId, {
progress: checkpoint,
currentStage: `sync_${Math.round(checkpoint * 100)}`,
syncCheckpoints: { ...syncCheckpoints },
});
};
const checkpointPromises = [
trackCheckpoint(0.25, "sync25"),
trackCheckpoint(0.5, "sync50"),
trackCheckpoint(0.75, "sync75"),
];
const synthesizedResult = await cognitiveCore.reasoningOrchestrator.executeStreams(
problemObj,
streams,
totalTimeout
);
await Promise.race([
Promise.allSettled(checkpointPromises),
new Promise((resolve) => setTimeout(resolve, 1000)),
]);
const overheadMetrics = coordinationManager.getOverheadMetrics();
const processingTime = Date.now() - startTime;
const overheadPercentage = coordinationManager.measureCoordinationOverhead(processingTime);
const responseData: ParallelReasoningResponse = {
sessionId: session.sessionId,
streams: buildStreamOutputs(session.sessionId, synthesizedResult),
synthesis: convertToSynthesisResponse(synthesizedResult),
conflictsResolved: synthesizedResult.conflicts.map(convertConflictToResponse),
coordinationMetrics: {
sync25: syncCheckpoints.sync25,
sync50: syncCheckpoints.sync50,
sync75: syncCheckpoints.sync75,
totalCoordinationTime: overheadMetrics.totalCoordinationTime,
overheadPercentage: Math.round(overheadPercentage * 100) / 100,
},
};
parallelReasoningSessionStore.updateSession(session.sessionId, {
status: "complete",
progress: 1.0,
currentStage: "completed",
result: responseData,
});
res.status(200).json(buildSuccessResponse(responseData, { requestId, startTime }));
});
}
/**
* Create an SSE event object
* Requirements: 4.2
*/
function createSSEEvent(
type: SSEEventType,
sessionId: string,
data: Partial<SSEEvent["data"]> = {}
): SSEEvent {
return {
type,
timestamp: new Date().toISOString(),
data: { sessionId, ...data },
};
}
/**
* Write an SSE event to the response
* Requirements: 4.2
*/
function writeSSEEvent(res: Response, event: SSEEvent): void {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
/**
* Set up SSE headers on the response
* Requirements: 4.2
*/
function setupSSEHeaders(res: Response): void {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
}
/**
* Check and send checkpoint events
* Requirements: 4.2
*/
function checkAndSendCheckpoints(
res: Response,
session: ParallelReasoningSession,
sentCheckpoints: { sync25: boolean; sync50: boolean; sync75: boolean }
): void {
const checkpoints = [
{ key: "sync25" as const, value: 25, threshold: 0.25 },
{ key: "sync50" as const, value: 50, threshold: 0.5 },
{ key: "sync75" as const, value: 75, threshold: 0.75 },
];
for (const { key, value, threshold } of checkpoints) {
if (
!sentCheckpoints[key] &&
session.syncCheckpoints[key] > 0 &&
session.progress >= threshold
) {
writeSSEEvent(
res,
createSSEEvent("sync_checkpoint", session.sessionId, { checkpoint: value })
);
sentCheckpoints[key] = true;
}
}
}
/**
* Handler for GET /api/v1/reasoning/parallel/:sessionId/stream
* Requirements: 4.2
*
* Sets up Server-Sent Events connection for real-time updates
* from parallel reasoning streams.
*/
function createSSEStreamHandler(): (
req: Request,
res: Response,
next: import("express").NextFunction
) => void {
return asyncHandler(async (req: Request, res: Response): Promise<void> => {
const { sessionId } = req.params;
if (!sessionId || typeof sessionId !== "string" || sessionId.trim() === "") {
throw new ValidationApiError({ sessionId: "sessionId is required" });
}
const session = parallelReasoningSessionStore.getSession(sessionId);
if (!session) {
throw new NotFoundError("ParallelReasoningSession", sessionId);
}
setupSSEHeaders(res);
writeSSEEvent(
res,
createSSEEvent("stream_started", session.sessionId, {
progress: session.progress,
message: "Connected to SSE stream",
})
);
const client = sseConnectionManager.addClient(sessionId, res);
if (session.status === "complete") {
writeSSEEvent(
res,
createSSEEvent("session_completed", session.sessionId, {
progress: 1.0,
message: "Session completed",
})
);
sseConnectionManager.removeClient(sessionId, client);
res.end();
return;
}
if (session.status === "error") {
writeSSEEvent(
res,
createSSEEvent("session_error", session.sessionId, {
error: session.error ?? "Unknown error",
})
);
sseConnectionManager.removeClient(sessionId, client);
res.end();
return;
}
const sentCheckpoints = { sync25: false, sync50: false, sync75: false };
const cleanup = (): void => {
clearInterval(pollInterval);
sseConnectionManager.removeClient(sessionId, client);
if (!res.writableEnded) res.end();
};
const pollInterval = setInterval(() => {
const currentSession = parallelReasoningSessionStore.getSession(sessionId);
if (!currentSession) {
writeSSEEvent(
res,
createSSEEvent("session_error", sessionId, {
error: "Session no longer exists",
})
);
cleanup();
return;
}
writeSSEEvent(
res,
createSSEEvent("stream_progress", currentSession.sessionId, {
progress: currentSession.progress,
message: currentSession.currentStage,
})
);
checkAndSendCheckpoints(res, currentSession, sentCheckpoints);
if (currentSession.status === "complete") {
writeSSEEvent(
res,
createSSEEvent("session_completed", currentSession.sessionId, {
progress: 1.0,
message: "Session completed",
})
);
cleanup();
return;
}
if (currentSession.status === "error") {
writeSSEEvent(
res,
createSSEEvent("session_error", currentSession.sessionId, {
error: currentSession.error ?? "Unknown error",
})
);
cleanup();
return;
}
}, 500);
req.on("close", cleanup);
req.on("error", cleanup);
});
}
/**
* Build reasoning steps from session result
* Requirements: 4.3
*/
function buildReasoningSteps(
sessionId: string,
session: ParallelReasoningSession,
result: ParallelReasoningResponse
): { steps: ReasoningStepResponse[]; confidenceEvolution: number[] } {
const steps: ReasoningStepResponse[] = [];
const confidenceEvolution: number[] = [];
let stepIndex = 0;
// Initial hypothesis step
steps.push({
id: `${sessionId}-step-${stepIndex++}`,
content: `Problem analysis initiated with ${session.activeStreams.length} parallel streams`,
type: "hypothesis",
confidence: 0.5,
evidence: [],
timestamp: session.startedAt.toISOString(),
});
confidenceEvolution.push(0.5);
// Steps from stream insights
for (const stream of result.streams) {
for (const insight of stream.insights) {
steps.push({
id: `${sessionId}-step-${stepIndex++}`,
content: insight.content,
type: "inference",
confidence: insight.confidence,
evidence: [stream.streamType],
timestamp: new Date().toISOString(),
});
confidenceEvolution.push(insight.confidence);
}
}
// Synthesis insights as evidence steps
for (const insight of result.synthesis.insights) {
steps.push({
id: `${sessionId}-step-${stepIndex++}`,
content: insight.content,
type: "evidence",
confidence: insight.confidence,
evidence: insight.sources,
timestamp: new Date().toISOString(),
});
confidenceEvolution.push(insight.confidence);
}
// Conclusion step
steps.push({
id: `${sessionId}-step-${stepIndex++}`,
content: result.synthesis.conclusion,
type: "conclusion",
confidence: result.synthesis.confidence,
evidence: result.streams.map((s) => s.streamType),
timestamp: session.completedAt?.toISOString() ?? new Date().toISOString(),
});
confidenceEvolution.push(result.synthesis.confidence);
return { steps, confidenceEvolution };
}
/**
* Build reasoning branches from stream outputs
* Requirements: 4.3
*/
function buildReasoningBranches(
sessionId: string,
result: ParallelReasoningResponse
): ReasoningBranchResponse[] {
const branches: ReasoningBranchResponse[] = [];
let branchIndex = 0;
for (const stream of result.streams) {
if (stream.insights.length > 0) {
branches.push({
id: `${sessionId}-branch-${branchIndex++}`,
description: `${stream.streamType} reasoning path`,
steps: stream.insights.map((insight, idx) => ({
id: `${sessionId}-branch-${branchIndex - 1}-step-${idx}`,
content: insight.content,
type: "inference" as const,
confidence: insight.confidence,
evidence: [stream.streamType],
timestamp: new Date().toISOString(),
})),
selected: true,
rationale: `Contributed to synthesis with confidence ${stream.confidence}`,
});
}
}
return branches;
}
/**
* Build decision points from session and result
* Requirements: 4.3
*/
function buildDecisionPoints(
sessionId: string,
session: ParallelReasoningSession,
result: ParallelReasoningResponse
): DecisionPoint[] {
const decisionPoints: DecisionPoint[] = [];
const checkpointTimestamp = new Date().toISOString();
// Stream selection decision
decisionPoints.push({
id: `${sessionId}-decision-0`,
description: "Parallel stream selection",
options: ["analytical", "creative", "critical", "synthetic"],
selectedOption: session.activeStreams.join(", "),
rationale: `Selected ${session.activeStreams.length} streams for comprehensive analysis`,
confidence: 0.9,
timestamp: session.startedAt.toISOString(),
});
// Sync checkpoint decisions
const checkpoints = [
{ sync: result.coordinationMetrics.sync25, id: 1, pct: "25%", conf: 0.75 },
{ sync: result.coordinationMetrics.sync50, id: 2, pct: "50%", conf: 0.85 },
{ sync: result.coordinationMetrics.sync75, id: 3, pct: "75%", conf: 0.9 },
];
for (const cp of checkpoints) {
if (cp.sync > 0) {
decisionPoints.push({
id: `${sessionId}-decision-${cp.id}`,
description: `${cp.pct} synchronization checkpoint`,
options: ["continue", "adjust", "abort"],
selectedOption: "continue",
rationale: `Streams synchronized at ${cp.pct} in ${cp.sync}ms`,
confidence: cp.conf,
timestamp: checkpointTimestamp,
});
}
}
// Synthesis decision
decisionPoints.push({
id: `${sessionId}-decision-final`,
description: "Synthesis approach selection",
options: ["weighted_merge", "conflict_resolution", "consensus"],
selectedOption: result.conflictsResolved.length > 0 ? "conflict_resolution" : "weighted_merge",
rationale:
result.conflictsResolved.length > 0
? `Resolved ${result.conflictsResolved.length} conflicts during synthesis`
: "Merged stream outputs using weighted confidence scores",
confidence: result.synthesis.confidence,
timestamp: session.completedAt?.toISOString() ?? new Date().toISOString(),
});
return decisionPoints;
}
/**
* Handler for GET /api/v1/reasoning/chain/:sessionId
* Requirements: 4.3
*
* Returns the complete reasoning chain with steps, branches,
* confidence evolution, and decision points for a completed session.
*/
function createReasoningChainHandler(): (
req: Request,
res: Response,
next: import("express").NextFunction
) => void {
return asyncHandler(async (req: Request, res: Response): Promise<void> => {
const requestId = getRequestId(req);
const startTime = Date.now();
const { sessionId } = req.params;
if (!sessionId || typeof sessionId !== "string" || sessionId.trim() === "") {
throw new ValidationApiError({ sessionId: "sessionId is required" });
}
const session = parallelReasoningSessionStore.getSession(sessionId);
if (!session) {
throw new NotFoundError("ParallelReasoningSession", sessionId);
}
let steps: ReasoningStepResponse[] = [];
let confidenceEvolution: number[] = [];
let branches: ReasoningBranchResponse[] = [];
let decisionPoints: DecisionPoint[] = [];
if (session.result) {
const result = session.result;
const stepsData = buildReasoningSteps(sessionId, session, result);
steps = stepsData.steps;
confidenceEvolution = stepsData.confidenceEvolution;
branches = buildReasoningBranches(sessionId, result);
decisionPoints = buildDecisionPoints(sessionId, session, result);
} else {
// Session still processing - return minimal chain
steps.push({
id: `${sessionId}-step-0`,
content: `Session ${session.status}: ${session.currentStage}`,
type: "hypothesis",
confidence: session.progress,
evidence: session.activeStreams,
timestamp: session.startedAt.toISOString(),
});
confidenceEvolution.push(session.progress);
decisionPoints.push({
id: `${sessionId}-decision-0`,
description: "Parallel stream selection",
options: ["analytical", "creative", "critical", "synthetic"],
selectedOption: session.activeStreams.join(", "),
rationale: `Selected ${session.activeStreams.length} streams for analysis`,
confidence: 0.9,
timestamp: session.startedAt.toISOString(),
});
}
const responseData: ReasoningChainResponse = {
chainId: sessionId,
steps,
branches,
confidenceEvolution,
decisionPoints,
};
res.status(200).json(buildSuccessResponse(responseData, { requestId, startTime }));
});
}
/**
* Create reasoning routes
*
* @param cognitiveCore - Shared cognitive core instance
* @returns Express router with reasoning endpoints
*/
export function createReasoningRoutes(cognitiveCore: CognitiveCore): Router {
const router = Router();
// POST /api/v1/think - Initiate thinking
// Requirements: 3.1, 3.3
router.post("/", createThinkHandler(cognitiveCore));
// GET /api/v1/think/status/:sessionId - Get thinking status
// Requirements: 3.2
router.get("/status/:sessionId", createThinkStatusHandler());
// POST /api/v1/reasoning/parallel - Parallel reasoning
// Requirements: 4.1, 4.4
router.post("/parallel", createParallelReasoningHandler(cognitiveCore));
// GET /api/v1/reasoning/parallel/:sessionId/stream - SSE stream
// Requirements: 4.2
router.get("/parallel/:sessionId/stream", createSSEStreamHandler());
// GET /api/v1/reasoning/chain/:sessionId - Get reasoning chain
// Requirements: 4.3
router.get("/chain/:sessionId", createReasoningChainHandler());
return router;
}
// Export session stores and connection manager for testing
export { parallelReasoningSessionStore, sseConnectionManager, thinkSessionStore };