/**
* Unified Server Agent Loop — single source of truth for server-side agentic reasoning.
*
* Used by:
* - handleAgentChat (SSE streaming to clients)
* - setAgentExecutor (workflow "agent" step type)
*
* Consolidates: streaming, prompt caching, context management betas, compaction,
* loop detection, parallel tool execution, subagent delegation, retry, cost tracking.
*/
import type Anthropic from "@anthropic-ai/sdk";
import type { SupabaseClient } from "@supabase/supabase-js";
import type {
BetaTextBlockParam,
BetaMessageParam,
BetaToolUnion,
BetaRawMessageStreamEvent,
BetaMessage,
MessageCreateParamsStreaming,
MessageCreateParamsNonStreaming,
} from "@anthropic-ai/sdk/resources/beta/messages/messages";
import {
LoopDetector,
getContextManagement,
getMaxOutputTokens,
addPromptCaching,
estimateCostUsd,
isRetryableError,
sanitizeError,
} from "../../shared/agent-core.js";
import { processStreamWithCallbacks } from "../../shared/sse-parser.js";
import type { BetaStreamEvent } from "../../shared/anthropic-types.js";
import { dispatchTools, buildAssistantContent, type ToolExecutor } from "../../shared/tool-dispatch.js";
import {
DELEGATE_TASK_TOOL_DEF,
runServerSubagent,
type SubagentProgressCallback,
type SubagentProgressEvent,
} from "./server-subagent.js";
// ============================================================================
// TYPES
// ============================================================================
export interface ToolDef {
name: string;
description: string;
input_schema: Record<string, unknown>;
}
export type ToolResult = { success: boolean; data?: unknown; error?: string };
export interface ServerAgentLoopOptions {
// Core
anthropic: Anthropic;
supabase: SupabaseClient;
model: string;
systemPrompt: string;
messages: Anthropic.MessageParam[];
tools: ToolDef[];
// Behavior
maxTurns: number;
temperature: number;
maxTokens?: number;
maxToolResultChars: number;
// Context (audit + subagent)
storeId?: string;
traceId?: string;
userId?: string | null;
userEmail?: string | null;
source?: string;
conversationId?: string;
agentId?: string;
// Tool execution — injected, avoids circular deps
executeTool: (
toolName: string,
args: Record<string, unknown>,
sourceOverride?: string,
) => Promise<ToolResult>;
// Feature flags
enableDelegation?: boolean; // Default: true (auto-inject for Opus)
enablePromptCaching?: boolean; // Default: true
enableStreaming?: boolean; // Default: true
maxConcurrentTools?: number; // Default: 7
// Callbacks (all optional)
onText?: (text: string) => void;
onToolStart?: (name: string) => void;
onToolResult?: (name: string, success: boolean, result: unknown) => void;
onSubagentProgress?: SubagentProgressCallback;
// Abort control
clientDisconnected?: { value: boolean };
startedAt?: number;
maxDurationMs?: number;
}
export interface ServerAgentLoopResult {
finalText: string;
allTextResponses: string[];
turnCount: number;
toolCallCount: number;
toolsUsed: string[];
tokens: {
input: number;
output: number;
cacheCreation: number;
cacheRead: number;
};
costUsd: number;
loopDetectorStats: {
totalErrors: number;
failedStrategies: number;
consecutiveFailedTurns: number;
};
}
// Re-export for consumers
export type { SubagentProgressCallback, SubagentProgressEvent };
// ============================================================================
// CONSTANTS
// ============================================================================
const MAX_RETRIES = 3;
const RETRY_BASE_DELAY_MS = 1000;
const DEFAULT_MAX_CONCURRENT_TOOLS = 7;
// ============================================================================
// UNIFIED AGENT LOOP
// ============================================================================
export async function runServerAgentLoop(
opts: ServerAgentLoopOptions,
): Promise<ServerAgentLoopResult> {
const {
anthropic,
model,
systemPrompt,
messages,
tools: inputTools,
maxTurns,
temperature,
maxToolResultChars,
enableDelegation = true,
enablePromptCaching = true,
enableStreaming = true,
maxConcurrentTools = DEFAULT_MAX_CONCURRENT_TOOLS,
onText,
onToolStart,
clientDisconnected = { value: false },
startedAt = Date.now(),
maxDurationMs = 5 * 60 * 1000,
} = opts;
// Auto-inject delegate_task for all models (subagents always use Claude Haiku/Sonnet)
const tools = [...inputTools];
if (enableDelegation) {
if (!tools.some((t) => t.name === "delegate_task")) {
tools.push(DELEGATE_TASK_TOOL_DEF as ToolDef);
}
}
const maxTokens = opts.maxTokens ?? getMaxOutputTokens(model);
const ctxMgmt = getContextManagement(model);
const loopDetector = new LoopDetector();
// Accumulators
let turnCount = 0;
let toolCallCount = 0;
let totalIn = 0;
let totalOut = 0;
let cacheCreationTokens = 0;
let cacheReadTokens = 0;
let sessionCostUsd = 0;
let finalResponse = "";
const allTextResponses: string[] = [];
const allToolNames: string[] = [];
while (turnCount < maxTurns) {
// Abort checks
if (clientDisconnected.value) {
console.log("[agent-loop] Client disconnected, stopping");
break;
}
if (Date.now() - startedAt > maxDurationMs) {
onText?.("[Request timeout exceeded]");
break;
}
turnCount++;
loopDetector.resetTurn();
// Prepare tool definitions
const toolDefs = tools.map((t) => ({
name: t.name,
description: t.description,
input_schema: t.input_schema,
}));
// Prompt caching: tools + turn boundary
let finalToolDefs: Array<Record<string, unknown>> = toolDefs;
let finalMessages: Array<Record<string, unknown>> = messages as unknown as Array<Record<string, unknown>>;
if (enablePromptCaching) {
const cached = addPromptCaching(
toolDefs as Array<Record<string, unknown>>,
messages as unknown as Array<Record<string, unknown>>,
);
finalToolDefs = cached.tools;
finalMessages = cached.messages;
}
// System prompt: cached block + dynamic cost context
const costContext = `Session cost: $${sessionCostUsd.toFixed(2)}`;
const system: BetaTextBlockParam[] = enablePromptCaching
? [
{ type: "text", text: systemPrompt, cache_control: { type: "ephemeral" } },
{ type: "text", text: costContext },
]
: [
{ type: "text", text: systemPrompt },
{ type: "text", text: costContext },
];
// API call with retry
if (enableStreaming) {
// ---- STREAMING PATH ----
let stream: AsyncIterable<BetaRawMessageStreamEvent> | null = null;
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
// Cast through unknown: context_management.edits is Record<string, unknown>[]
// from getContextManagement() but SDK expects specific edit union types
stream = await anthropic.beta.messages.create({
model,
max_tokens: maxTokens,
temperature,
system,
tools: finalToolDefs as unknown as BetaToolUnion[],
messages: finalMessages as unknown as BetaMessageParam[],
stream: true,
betas: ctxMgmt.betas,
context_management: ctxMgmt.config,
} as unknown as MessageCreateParamsStreaming);
break;
} catch (err) {
if (attempt < MAX_RETRIES && isRetryableError(err)) {
const delay = RETRY_BASE_DELAY_MS * Math.pow(2, attempt);
console.log(`[agent-loop] Retry ${attempt + 1}/${MAX_RETRIES}, delay ${delay}ms: ${sanitizeError(err)}`);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw err;
}
}
if (!stream) throw new Error("Failed to get response after retries");
// Process stream events via unified parser
const streamResult = await processStreamWithCallbacks(
stream as unknown as AsyncIterable<BetaStreamEvent>,
{ onText, onToolStart },
);
const currentText = streamResult.text;
const toolUseBlocks = streamResult.toolUseBlocks;
const compactionContent = streamResult.compactionContent;
// Accumulate per-turn tokens into session totals
totalIn += streamResult.usage.inputTokens;
totalOut += streamResult.usage.outputTokens;
cacheCreationTokens += streamResult.usage.cacheCreationTokens;
cacheReadTokens += streamResult.usage.cacheReadTokens;
// Update cost (include cache tokens for accurate pricing)
sessionCostUsd = estimateCostUsd(totalIn, totalOut, model, 0, cacheReadTokens, cacheCreationTokens);
if (currentText) allTextResponses.push(currentText);
// No tool calls → done
if (toolUseBlocks.length === 0) {
finalResponse = currentText;
break;
}
// Execute tools and build messages for next turn
const subagentTokens = { input: 0, output: 0, costUsd: 0 };
const executor = makeToolExecutor(opts, tools, allToolNames, subagentTokens);
const { results: toolResults } = await dispatchTools(toolUseBlocks, executor, {
loopDetector,
maxConcurrent: maxConcurrentTools,
maxResultChars: maxToolResultChars,
});
toolCallCount += toolUseBlocks.length;
// Aggregate subagent tokens into parent totals
totalIn += subagentTokens.input;
totalOut += subagentTokens.output;
sessionCostUsd = estimateCostUsd(totalIn, totalOut, model, 0, cacheReadTokens, cacheCreationTokens) + subagentTokens.costUsd;
const assistantContent = buildAssistantContent({ text: currentText, toolUseBlocks, compactionContent });
messages.push({ role: "assistant", content: assistantContent } as unknown as Anthropic.MessageParam);
messages.push({ role: "user", content: toolResults } as unknown as Anthropic.MessageParam);
} else {
// ---- NON-STREAMING PATH ----
let response: BetaMessage | null = null;
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
// Cast through unknown: context_management.edits is Record<string, unknown>[]
// from getContextManagement() but SDK expects specific edit union types
response = await anthropic.beta.messages.create({
model,
max_tokens: maxTokens,
temperature,
system,
tools: finalToolDefs as unknown as BetaToolUnion[],
messages: finalMessages as unknown as BetaMessageParam[],
betas: ctxMgmt.betas,
context_management: ctxMgmt.config,
} as unknown as MessageCreateParamsNonStreaming);
break;
} catch (err) {
if (attempt < MAX_RETRIES && isRetryableError(err)) {
const delay = RETRY_BASE_DELAY_MS * Math.pow(2, attempt);
console.log(`[agent-loop] Retry ${attempt + 1}/${MAX_RETRIES}, delay ${delay}ms: ${sanitizeError(err)}`);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw err;
}
}
if (!response) throw new Error("Failed to get response after retries");
// Track tokens
totalIn += response.usage?.input_tokens || 0;
totalOut += response.usage?.output_tokens || 0;
cacheCreationTokens += response.usage?.cache_creation_input_tokens ?? 0;
cacheReadTokens += response.usage?.cache_read_input_tokens ?? 0;
// Extract text and tool_use blocks
let currentText = "";
const toolUseBlocks: Array<{ id: string; name: string; input: Record<string, unknown> }> = [];
for (const block of response.content) {
if (block.type === "text") {
currentText += block.text;
onText?.(block.text);
} else if (block.type === "tool_use") {
toolUseBlocks.push({
id: block.id,
name: block.name,
input: block.input as Record<string, unknown>,
});
onToolStart?.(block.name);
}
}
sessionCostUsd = estimateCostUsd(totalIn, totalOut, model, 0, cacheReadTokens, cacheCreationTokens);
if (currentText) allTextResponses.push(currentText);
// No tool calls → done (don't check stop_reason — model may return end_turn WITH tool calls)
if (toolUseBlocks.length === 0) {
finalResponse = currentText;
break;
}
// Execute tools
const nonStreamSubTokens = { input: 0, output: 0, costUsd: 0 };
const nsExecutor = makeToolExecutor(opts, tools, allToolNames, nonStreamSubTokens);
const { results: toolResults } = await dispatchTools(toolUseBlocks, nsExecutor, {
loopDetector,
maxConcurrent: maxConcurrentTools,
maxResultChars: maxToolResultChars,
});
toolCallCount += toolUseBlocks.length;
// Aggregate subagent tokens into parent totals
totalIn += nonStreamSubTokens.input;
totalOut += nonStreamSubTokens.output;
sessionCostUsd = estimateCostUsd(totalIn, totalOut, model, 0, cacheReadTokens, cacheCreationTokens) + nonStreamSubTokens.costUsd;
const assistantContent = buildAssistantContent({ text: currentText, toolUseBlocks });
messages.push({ role: "assistant", content: assistantContent } as unknown as Anthropic.MessageParam);
messages.push({ role: "user", content: toolResults } as unknown as Anthropic.MessageParam);
}
}
const fullText = allTextResponses.join("\n\n") || finalResponse;
return {
finalText: fullText,
allTextResponses,
turnCount,
toolCallCount,
toolsUsed: [...new Set(allToolNames)],
tokens: {
input: totalIn,
output: totalOut,
cacheCreation: cacheCreationTokens,
cacheRead: cacheReadTokens,
},
costUsd: sessionCostUsd,
loopDetectorStats: loopDetector.getSessionStats(),
};
}
// ============================================================================
// TOOL EXECUTOR FACTORY — creates executor for dispatchTools with delegation
// ============================================================================
function makeToolExecutor(
opts: ServerAgentLoopOptions,
tools: ToolDef[],
allToolNames: string[],
subagentTokens: { input: number; output: number; costUsd: number },
): ToolExecutor {
const {
anthropic,
supabase,
storeId,
traceId,
userId,
userEmail,
conversationId,
agentId,
executeTool,
onToolResult,
onSubagentProgress,
clientDisconnected = { value: false },
startedAt = Date.now(),
maxDurationMs = 5 * 60 * 1000,
} = opts;
return async (name: string, input: Record<string, unknown>) => {
allToolNames.push(name);
// Subagent delegation
if (name === "delegate_task") {
const subPrompt = String(input.prompt || "");
const subModel = (input.model === "sonnet" ? "sonnet" : "haiku") as "haiku" | "sonnet";
const subMaxTurns = Math.min(Math.max(1, Number(input.max_turns) || 6), 12);
const subTools = tools.filter((t) => t.name !== "delegate_task");
const subId = `sub-${Date.now().toString(36)}`;
onSubagentProgress?.({ subagentId: subId, event: "started", model: subModel });
const subStartTime = Date.now();
const subResult = await runServerSubagent({
anthropic, supabase, storeId, prompt: subPrompt, model: subModel,
maxTurns: subMaxTurns, tools: subTools,
executeTool: async (toolName, args) => executeTool(toolName, args, "server_subagent"),
onProgress: onSubagentProgress, clientDisconnected, startedAt, maxDurationMs,
});
onSubagentProgress?.({ subagentId: subId, event: "done", output: subResult.output });
// Audit log
const subDurationMs = Date.now() - subStartTime;
const subModelId = subModel === "sonnet" ? "claude-sonnet-4-20250514" : "claude-haiku-4-5-20251001";
try {
await supabase.from("audit_logs").insert({
action: "chat.subagent_complete", severity: "info",
store_id: storeId || null, resource_type: "chat_subagent",
resource_id: agentId || null, request_id: traceId || null,
conversation_id: conversationId || null, source: "server_subagent",
user_id: userId || null, user_email: userEmail || null,
input_tokens: subResult.tokensUsed.input, output_tokens: subResult.tokensUsed.output,
total_cost: subResult.costUsd, model: subModelId, duration_ms: subDurationMs,
details: {
subagent_model: subModel, turn_count: subResult.turnCount,
tool_calls: subResult.toolsUsed.length, tool_names: subResult.toolsUsed,
cost_usd: subResult.costUsd, success: subResult.success,
prompt_preview: subPrompt.substring(0, 200),
},
});
} catch (err) { console.error("[subagent-audit] Failed to log delegation:", (err as Error).message); }
subagentTokens.input += subResult.tokensUsed.input;
subagentTokens.output += subResult.tokensUsed.output;
subagentTokens.costUsd += subResult.costUsd;
allToolNames.push(...subResult.toolsUsed);
return { success: subResult.success, output: subResult.output };
}
// Regular tool execution
const result = await executeTool(name, input);
onToolResult?.(name, result.success, result.success ? result.data : result.error);
return {
success: result.success,
output: JSON.stringify(result.success ? result.data : { error: result.error }),
};
};
}