/**
* Agent Worker Base — shared utilities for subagent.ts and teammate.ts
*
* Extracts the common plumbing:
* - API calling (auth, build request, call proxy, collect stream, convert response)
* - Tool execution with loop detection, truncation, and result formatting
* - Token tracking
*
* Both subagent and teammate keep their own types, system prompts, and lifecycle logic.
* This module provides composable functions, NOT a class hierarchy.
*/
import type Anthropic from "@anthropic-ai/sdk";
import { LoopDetector } from "../../shared/agent-core.js";
import { getProxyUrl, resolveConfig } from "./config-store.js";
import { getValidToken } from "./auth-service.js";
import { parseSSEStream, collectStreamResult } from "../../shared/sse-parser.js";
import {
callServerProxy,
buildAPIRequest,
buildSystemBlocks,
} from "../../shared/api-client.js";
import {
isLocalTool,
executeLocalTool,
} from "./local-tools.js";
import {
isServerTool,
executeServerTool,
} from "./server-tools.js";
import type { ContextProfile, StreamResult } from "../../shared/types.js";
// ============================================================================
// TYPES
// ============================================================================
/** Unified response from an API call — used by both subagent and teammate */
export interface AgentAPIResponse {
content: Anthropic.ContentBlock[];
usage: { input_tokens: number; output_tokens: number };
stop_reason: string;
}
/** Options for callAgentAPI */
export interface AgentAPIOptions {
modelId: string;
contextProfile: ContextProfile;
systemPrompt: string;
messages: Anthropic.MessageParam[];
tools: Anthropic.Tool[];
thinkingEnabled?: boolean;
maxOutputTokens?: number;
timeoutMs?: number;
/** If true, add cache_control to last tool */
cacheLastTool?: boolean;
}
/** Result of executing a single tool */
export interface ToolExecResult {
success: boolean;
output: string;
}
/** Callbacks for tool execution progress */
export interface ToolExecCallbacks {
onToolStart?: (toolName: string, input: Record<string, unknown>) => void;
onToolEnd?: (toolName: string, success: boolean, durationMs: number) => void;
}
/** Options for executeToolBlocks */
export interface ExecuteToolBlocksOptions {
toolBlocks: Anthropic.ToolUseBlock[];
loopDetector: LoopDetector;
callbacks?: ToolExecCallbacks;
/** Additional tool executor for domain-specific tools (e.g., team tools) */
customExecutor?: (name: string, input: Record<string, unknown>) => Promise<ToolExecResult | null>;
/** Max chars per tool result (default 30_000) */
maxToolResultChars?: number;
/** Timeout per tool call in ms (no timeout if not set) */
toolTimeoutMs?: number;
}
/** Result of executing all tool blocks in a turn */
export interface ToolBlocksResult {
toolResults: Anthropic.ToolResultBlockParam[];
toolsUsed: string[];
/** Set by custom executor if a domain event occurred (e.g., team task completed) */
customSignals: Record<string, unknown>;
}
// ============================================================================
// API CLIENT — unified proxy caller
// ============================================================================
/**
* Convert a StreamResult (from SSE collector) to the AgentAPIResponse format
* that both subagent and teammate expect.
*
* Subagent includes thinking blocks; teammate does not (thinkingEnabled=false).
*/
export function streamResultToResponse(
result: StreamResult,
includeThinking: boolean,
): AgentAPIResponse {
const content: Anthropic.ContentBlock[] = [];
if (includeThinking) {
for (const tb of result.thinkingBlocks) {
content.push({
type: "thinking",
thinking: tb.thinking,
signature: tb.signature,
} as any);
}
}
if (result.text) {
content.push({ type: "text", text: result.text } as Anthropic.TextBlock);
}
for (const tu of result.toolUseBlocks) {
content.push({
type: "tool_use",
id: tu.id,
name: tu.name,
input: tu.input,
} as Anthropic.ToolUseBlock);
}
return {
content,
usage: {
input_tokens: result.usage.inputTokens,
output_tokens: result.usage.outputTokens,
},
stop_reason: result.stopReason,
};
}
/**
* Call the server proxy API, collect the SSE stream, and return a typed response.
* Handles auth, request building, caching setup, and stream collection.
*/
export async function callAgentAPI(opts: AgentAPIOptions): Promise<AgentAPIResponse> {
const token = await getValidToken();
if (!token) throw new Error("No API key available. Run `whale login`.");
const apiConfig = buildAPIRequest({
model: opts.modelId,
contextProfile: opts.contextProfile,
thinkingEnabled: opts.thinkingEnabled ?? false,
maxOutputTokens: opts.maxOutputTokens,
});
// Optionally add cache_control to last tool
let tools: Array<Record<string, unknown>>;
if (opts.cacheLastTool && opts.tools.length > 0) {
tools = [
...opts.tools.slice(0, -1),
{ ...opts.tools[opts.tools.length - 1], cache_control: { type: "ephemeral" } },
] as any;
} else {
tools = [...opts.tools] as any;
}
const system = buildSystemBlocks(opts.systemPrompt);
const { storeId } = resolveConfig();
const stream = await callServerProxy({
proxyUrl: getProxyUrl(),
token,
model: opts.modelId,
system,
messages: opts.messages as any,
tools,
apiConfig,
storeId: storeId || undefined,
...(opts.timeoutMs ? { timeoutMs: opts.timeoutMs } : {}),
});
const result = await collectStreamResult(parseSSEStream(stream));
return streamResultToResponse(result, opts.thinkingEnabled ?? false);
}
// ============================================================================
// TOOL EXECUTION — shared loop with detection, routing, and truncation
// ============================================================================
/**
* Execute an array of tool use blocks, handling:
* - Loop detection (via LoopDetector)
* - Routing to local, server, or custom executor
* - Result truncation to maxToolResultChars
* - Optional per-tool timeout
*
* Returns tool results ready to append to messages, plus metadata.
*/
export async function executeToolBlocks(
opts: ExecuteToolBlocksOptions,
): Promise<ToolBlocksResult> {
const {
toolBlocks,
loopDetector,
callbacks,
customExecutor,
maxToolResultChars = 30_000,
toolTimeoutMs,
} = opts;
const toolResults: Anthropic.ToolResultBlockParam[] = [];
const toolsUsed: string[] = [];
const customSignals: Record<string, unknown> = {};
for (const tu of toolBlocks) {
// Circuit breaker check
const loopCheck = loopDetector.recordCall(tu.name, tu.input as Record<string, unknown>);
if (loopCheck.blocked) {
toolResults.push({
type: "tool_result",
tool_use_id: tu.id,
content: JSON.stringify({ error: loopCheck.reason }),
});
continue;
}
callbacks?.onToolStart?.(tu.name, tu.input as Record<string, unknown>);
const toolStart = Date.now();
let result: ToolExecResult;
try {
// Wrap execution with optional timeout
const executeOne = async (): Promise<ToolExecResult> => {
// Try custom executor first (team tools, etc.)
if (customExecutor) {
const customResult = await customExecutor(tu.name, tu.input as Record<string, unknown>);
if (customResult !== null) return customResult;
}
// Standard tool routing
if (isLocalTool(tu.name)) {
return await executeLocalTool(tu.name, tu.input as Record<string, unknown>);
} else if (isServerTool(tu.name)) {
return await executeServerTool(tu.name, tu.input as Record<string, unknown>);
} else {
return { success: false, output: `Unknown tool: ${tu.name}` };
}
};
if (toolTimeoutMs) {
result = await Promise.race([
executeOne(),
new Promise<ToolExecResult>((_, reject) =>
setTimeout(
() => reject(new Error(`Tool ${tu.name} timed out after ${toolTimeoutMs / 1000}s`)),
toolTimeoutMs,
),
),
]);
} else {
result = await executeOne();
}
} catch (toolErr: any) {
result = { success: false, output: `Tool execution error: ${toolErr.message || String(toolErr)}` };
}
const toolDuration = Date.now() - toolStart;
loopDetector.recordResult(tu.name, result.success, tu.input as Record<string, unknown>);
callbacks?.onToolEnd?.(tu.name, result.success, toolDuration);
// Track tool usage
if (!toolsUsed.includes(tu.name)) {
toolsUsed.push(tu.name);
}
// Truncate and format result
let contentStr = JSON.stringify(result.success ? result.output : { error: result.output });
if (contentStr.length > maxToolResultChars) {
contentStr =
contentStr.slice(0, maxToolResultChars) +
`\n\n... (truncated — ${contentStr.length.toLocaleString()} chars total)`;
}
toolResults.push({
type: "tool_result",
tool_use_id: tu.id,
content: contentStr,
});
}
return { toolResults, toolsUsed, customSignals };
}
// ============================================================================
// CONTENT BLOCK HELPERS
// ============================================================================
/** Extract text blocks from a response */
export function extractTextBlocks(content: Anthropic.ContentBlock[]): Anthropic.TextBlock[] {
return content.filter((b): b is Anthropic.TextBlock => b.type === "text");
}
/** Extract tool use blocks from a response */
export function extractToolUseBlocks(content: Anthropic.ContentBlock[]): Anthropic.ToolUseBlock[] {
return content.filter((b): b is Anthropic.ToolUseBlock => b.type === "tool_use");
}
/** Get combined text from all text blocks */
export function getResponseText(content: Anthropic.ContentBlock[]): string {
return extractTextBlocks(content).map((b) => b.text).join("\n");
}
// ============================================================================
// YIELD HELPERS — prevent blocking the event loop during long agent loops
// ============================================================================
/** Yield to event loop to prevent blocking (microtask resolution) */
export function yieldToEventLoop(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 0));
}
/** Longer yield for UI-critical moments (before long API calls, ~1 frame at 60fps) */
export function yieldForRender(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 16));
}