// server/proxy-handlers.ts — Multi-provider API proxy handlers
// Extracted from index.ts. Routes CLI agent requests to Anthropic, Bedrock, or Gemini.
import http from "node:http";
import { randomUUID } from "node:crypto";
import { createClient } from "@supabase/supabase-js";
import Anthropic from "@anthropic-ai/sdk";
import { BedrockRuntimeClient, InvokeModelWithResponseStreamCommand } from "@aws-sdk/client-bedrock-runtime";
import { GoogleGenAI, type Part, type Content, type FunctionDeclaration } from "@google/genai";
import OpenAI from "openai";
import { sanitizeError } from "../shared/agent-core.js";
import { ALLOWED_MODELS, getProvider, isOpenAIReasoningModel } from "../shared/constants.js";
// ============================================================================
// HELPERS (passed in or env-based)
// ============================================================================
function jsonResponse(res: http.ServerResponse, status: number, data: unknown, corsHeaders: Record<string, string>): void {
res.writeHead(status, { "Content-Type": "application/json", ...corsHeaders });
res.end(JSON.stringify(data));
}
// ============================================================================
// CREDENTIAL RESOLUTION
// ============================================================================
const ANTHROPIC_MAX_TOKENS = 16384;
const GEMINI_MAX_TOKENS = 65536;
const OPENAI_MAX_TOKENS = 128000;
const OPENAI_REASONING_MAX_TOKENS = 100000;
// Credential cache for non-Anthropic providers — keyed by store_id, 5-minute TTL
const providerCredCacheMap = new Map<string, {
google?: { apiKey: string };
bedrock?: { accessKeyId: string; secretAccessKey: string; region: string };
openai?: { apiKey: string };
fetchedAt: number;
}>();
async function resolveProviderCredentials(
provider: "google" | "bedrock" | "openai",
storeId: string | undefined,
): Promise<{ google?: { apiKey: string }; bedrock?: { accessKeyId: string; secretAccessKey: string; region: string }; openai?: { apiKey: string } }> {
// Cache keyed by provider+storeId to avoid cross-provider cache pollution
const cacheKey = `${provider}:${storeId || "__default__"}`;
const cached = providerCredCacheMap.get(cacheKey);
if (cached && Date.now() - cached.fetchedAt < 300_000) {
return cached;
}
const SUPABASE_URL = process.env.SUPABASE_URL!;
const SUPABASE_SERVICE_ROLE_KEY = process.env.SUPABASE_SERVICE_ROLE_KEY!;
const sb = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
const result: typeof cached = { fetchedAt: Date.now() };
// Use decrypt_secret RPC (same pattern as llm-providers, image-gen, voice handlers)
if (provider === "google") {
try {
const { data, error } = await sb.rpc("decrypt_secret", { p_name: "GOOGLE_AI_API_KEY", p_store_id: storeId || null });
if (error) console.error(`[cred-resolve] Google key lookup failed for store=${storeId}:`, error.message);
if (data) result.google = { apiKey: data as string };
else console.error(`[cred-resolve] No Google key found for store=${storeId}`);
} catch (err: any) {
console.error(`[cred-resolve] Google key RPC error for store=${storeId}:`, err?.message);
}
}
if (provider === "bedrock") {
try {
const [akRes, skRes, rgRes] = await Promise.all([
sb.rpc("decrypt_secret", { p_name: "AWS_ACCESS_KEY_ID", p_store_id: storeId || null }),
sb.rpc("decrypt_secret", { p_name: "AWS_SECRET_ACCESS_KEY", p_store_id: storeId || null }),
sb.rpc("decrypt_secret", { p_name: "AWS_REGION", p_store_id: storeId || null }),
]);
if (akRes.data && skRes.data) {
result.bedrock = {
accessKeyId: akRes.data as string,
secretAccessKey: skRes.data as string,
region: (rgRes.data as string) || "us-east-1",
};
}
} catch (err: any) {
console.error(`[cred-resolve] Bedrock key RPC error for store=${storeId}:`, err?.message);
}
}
if (provider === "openai") {
try {
const { data, error } = await sb.rpc("decrypt_secret", { p_name: "OPENAI_API_KEY", p_store_id: storeId || null });
if (error) console.error(`[cred-resolve] OpenAI key lookup failed for store=${storeId}:`, error.message);
if (data) result.openai = { apiKey: data as string };
else console.error(`[cred-resolve] No OpenAI key found for store=${storeId}`);
} catch (err: any) {
console.error(`[cred-resolve] OpenAI key RPC error for store=${storeId}:`, err?.message);
}
}
providerCredCacheMap.set(cacheKey, result);
// LRU: cap cache at 50 stores
if (providerCredCacheMap.size > 50) {
const firstKey = providerCredCacheMap.keys().next().value;
if (firstKey) providerCredCacheMap.delete(firstKey);
}
return result;
}
// ============================================================================
// MAIN PROXY DISPATCHER
// ============================================================================
export async function handleProxy(
res: http.ServerResponse,
body: any,
corsHeaders: Record<string, string>
): Promise<void> {
const {
messages,
system,
tools,
model: requestedModel = "claude-sonnet-4-20250514",
max_tokens: requestedMaxTokens = 4096,
temperature,
stream = true,
betas,
context_management,
thinking,
store_id: storeId,
} = body;
if (!messages || !Array.isArray(messages)) {
jsonResponse(res, 400, { error: "messages array required" }, corsHeaders);
return;
}
const model = ALLOWED_MODELS.includes(requestedModel) ? requestedModel : "claude-sonnet-4-20250514";
const provider = getProvider(model);
const maxTokensLimit = provider === "gemini" ? GEMINI_MAX_TOKENS
: provider === "openai" ? (isOpenAIReasoningModel(model) ? OPENAI_REASONING_MAX_TOKENS : OPENAI_MAX_TOKENS)
: ANTHROPIC_MAX_TOKENS;
const max_tokens = Math.min(Math.max(1, Number(requestedMaxTokens) || 4096), maxTokensLimit);
// Route to provider-specific handler
if (provider === "bedrock") {
await handleProxyBedrock(res, model, messages, system, tools, max_tokens, temperature, thinking, storeId, corsHeaders);
return;
}
if (provider === "gemini") {
await handleProxyGemini(res, model, messages, system, tools, max_tokens, temperature, storeId, corsHeaders);
return;
}
if (provider === "openai") {
await handleProxyOpenAI(res, model, messages, system, tools, max_tokens, temperature, storeId, corsHeaders);
return;
}
// Default: Anthropic direct
await handleProxyAnthropic(res, model, messages, system, tools, max_tokens, temperature, stream, betas, context_management, thinking, corsHeaders);
}
// ============================================================================
// ANTHROPIC PROXY
// ============================================================================
async function handleProxyAnthropic(
res: http.ServerResponse,
model: string,
messages: any[],
system: any,
tools: any[] | undefined,
max_tokens: number,
temperature: number | undefined,
stream: boolean,
betas: string[] | undefined,
context_management: any,
thinking: any,
corsHeaders: Record<string, string>,
): Promise<void> {
const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY!;
const anthropic = new Anthropic({ apiKey: ANTHROPIC_API_KEY });
const apiParams: Record<string, unknown> = { model, max_tokens, messages, stream: true };
if (system) {
if (typeof system === "string") {
apiParams.system = [{ type: "text", text: system, cache_control: { type: "ephemeral" } }];
} else {
apiParams.system = system;
}
}
if (tools?.length) {
apiParams.tools = (tools as Array<Record<string, unknown>>).map((t: Record<string, unknown>) => ({
name: t.name,
description: typeof t.description === "string" ? (t.description as string).slice(0, 4096) : "",
input_schema: t.input_schema,
}));
}
if (temperature !== undefined) apiParams.temperature = temperature;
if (betas?.length) apiParams.betas = betas;
if (context_management?.edits?.length) apiParams.context_management = context_management;
if (thinking) apiParams.thinking = thinking;
if (!stream) {
try {
const response = betas?.length
? await anthropic.beta.messages.create({ ...apiParams, stream: false } as any)
: await anthropic.messages.create({ ...apiParams, stream: false } as any);
jsonResponse(res, 200, response, corsHeaders);
} catch (err) {
jsonResponse(res, 500, { error: sanitizeError(err) }, corsHeaders);
}
return;
}
// Streaming: passthrough SSE from Anthropic
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
...corsHeaders,
});
res.flushHeaders();
try {
const response = betas?.length
? await anthropic.beta.messages.create({ ...apiParams, stream: true } as any)
: await anthropic.messages.create(apiParams as any);
for await (const event of response as any) {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
res.write("data: [DONE]\n\n");
} catch (err) {
res.write(`data: ${JSON.stringify({ type: "error", error: sanitizeError(err) })}\n\n`);
}
res.end();
}
// ============================================================================
// BEDROCK PROXY
// ============================================================================
async function handleProxyBedrock(
res: http.ServerResponse,
model: string,
messages: any[],
system: any,
tools: any[] | undefined,
max_tokens: number,
temperature: number | undefined,
thinking: any,
storeId: string | undefined,
corsHeaders: Record<string, string>,
): Promise<void> {
const creds = await resolveProviderCredentials("bedrock", storeId);
if (!creds.bedrock) {
jsonResponse(res, 422, { error: "Bedrock credentials not configured. Add aws_access_key_id and aws_secret_access_key to platform_secrets." }, corsHeaders);
return;
}
const client = new BedrockRuntimeClient({
region: creds.bedrock.region,
credentials: {
accessKeyId: creds.bedrock.accessKeyId,
secretAccessKey: creds.bedrock.secretAccessKey,
},
});
// Build Anthropic Messages API body (Bedrock Claude understands this natively)
const bedrockBody: Record<string, unknown> = {
anthropic_version: "bedrock-2023-10-16",
max_tokens,
messages,
};
if (system) {
bedrockBody.system = typeof system === "string"
? [{ type: "text", text: system }]
: (system as any[]).map((s: any) => {
// Strip cache_control — Bedrock doesn't support it
const { cache_control: _, ...rest } = s;
return rest;
});
}
if (tools?.length) {
bedrockBody.tools = (tools as Array<Record<string, unknown>>).map((t: Record<string, unknown>) => ({
name: t.name,
description: typeof t.description === "string" ? (t.description as string).slice(0, 4096) : "",
input_schema: t.input_schema,
}));
}
if (temperature !== undefined) bedrockBody.temperature = temperature;
if (thinking) bedrockBody.thinking = thinking;
const command = new InvokeModelWithResponseStreamCommand({
modelId: model,
contentType: "application/json",
accept: "application/json",
body: new TextEncoder().encode(JSON.stringify(bedrockBody)),
});
// Stream SSE — Bedrock Claude streams Anthropic-format events
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
...corsHeaders,
});
res.flushHeaders();
try {
const response = await client.send(command);
if (response.body) {
for await (const event of response.body) {
if (event.chunk?.bytes) {
const chunk = JSON.parse(new TextDecoder().decode(event.chunk.bytes));
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
}
}
res.write("data: [DONE]\n\n");
} catch (err) {
res.write(`data: ${JSON.stringify({ type: "error", error: sanitizeError(err) })}\n\n`);
}
res.end();
}
// ============================================================================
// GEMINI PROXY — full format conversion to/from Anthropic SSE
// ============================================================================
/** Convert Anthropic messages to Gemini Content format */
function anthropicToGeminiMessages(messages: any[]): Content[] {
const contents: Content[] = [];
// Track tool_use_id → name for tool_result conversion
const toolIdMap = new Map<string, string>();
for (const msg of messages) {
const role = msg.role === "assistant" ? "model" : "user";
const parts: Part[] = [];
if (typeof msg.content === "string") {
parts.push({ text: msg.content });
} else if (Array.isArray(msg.content)) {
for (const block of msg.content) {
if (block.type === "text" && block.text) {
parts.push({ text: block.text });
} else if (block.type === "tool_use") {
toolIdMap.set(block.id, block.name);
parts.push({
functionCall: { name: block.name, args: block.input || {} },
// Preserve Gemini thought signature for function call round-trips
...(block.thought_signature ? { thoughtSignature: block.thought_signature } : {}),
} as any);
} else if (block.type === "image" && block.source?.data) {
parts.push({
inlineData: {
mimeType: block.source.media_type,
data: block.source.data,
},
} as any);
} else if (block.type === "tool_result") {
const toolName = toolIdMap.get(block.tool_use_id) || block.tool_use_id;
let resultContent = "";
if (typeof block.content === "string") {
resultContent = block.content;
} else if (Array.isArray(block.content)) {
resultContent = block.content
.filter((c: any) => c.type === "text")
.map((c: any) => c.text)
.join("\n");
}
parts.push({
functionResponse: {
name: toolName,
response: { result: resultContent },
},
});
}
}
}
if (parts.length > 0) {
contents.push({ role, parts });
}
}
return contents;
}
/** Gemini uses uppercase OpenAPI schema types, not lowercase JSON Schema types. */
const GEMINI_TYPE_MAP: Record<string, string> = {
string: "STRING",
number: "NUMBER",
integer: "INTEGER",
boolean: "BOOLEAN",
array: "ARRAY",
object: "OBJECT",
};
/**
* Keys that are JSON Schema-only and not supported by Gemini's OpenAPI Schema.
* IMPORTANT: Only strip keys that can't collide with property names in our tools.
*/
const GEMINI_SCHEMA_ONLY_KEYS = new Set([
"$schema", "additionalProperties", "propertyNames",
"minProperties", "maxProperties",
"anyOf", "oneOf", "allOf", "not", "const",
]);
/** Keys to strip only from schema definition objects (objects with a "type" field) */
const GEMINI_SCHEMA_VALIDATION_KEYS = new Set([
"exclusiveMinimum", "exclusiveMaximum",
"minItems", "maxItems", "minLength", "maxLength",
"minimum", "maximum", "pattern",
"default",
]);
/**
* Sanitize JSON Schema → Gemini OpenAPI Schema:
* - Convert type values to uppercase (string → STRING)
* - Stringify all enum values
* - Remove unsupported JSON Schema keys (carefully — don't strip property names)
* - Ensure arrays have items, objects have properties
* - Filter required to only include defined properties
*/
function sanitizeSchemaForGemini(schema: any): any {
if (!schema || typeof schema !== "object") return schema;
if (Array.isArray(schema)) return schema.map((s: any) => sanitizeSchemaForGemini(s));
const result: any = {};
const isSchemaNode = "type" in schema;
for (const [key, value] of Object.entries(schema)) {
// Always strip structural JSON Schema keys
if (GEMINI_SCHEMA_ONLY_KEYS.has(key)) continue;
// Strip validation keys only from schema nodes (not property name maps)
if (isSchemaNode && GEMINI_SCHEMA_VALIDATION_KEYS.has(key)) continue;
if (key === "type" && typeof value === "string") {
result[key] = GEMINI_TYPE_MAP[value] || value.toUpperCase();
} else if (key === "enum" && Array.isArray(value)) {
result[key] = value.map((v: any) => String(v));
} else if (key === "properties" && typeof value === "object" && value !== null && !Array.isArray(value)) {
// Properties map: keys are property names, values are schema definitions
const props: any = {};
for (const [propName, propSchema] of Object.entries(value as Record<string, unknown>)) {
props[propName] = sanitizeSchemaForGemini(propSchema);
}
result[key] = props;
} else {
result[key] = sanitizeSchemaForGemini(value);
}
}
// Gemini requires array items to have a type
if (result.type === "ARRAY") {
if (!result.items || (typeof result.items === "object" && Object.keys(result.items).length === 0)) {
result.items = { type: "STRING" };
}
}
// Gemini requires object types to have properties
if (result.type === "OBJECT" && !result.properties) {
result.properties = {};
}
// Gemini only allows enum on STRING type — force type to STRING if enum is present
if (result.enum && Array.isArray(result.enum) && result.type && result.type !== "STRING") {
result.type = "STRING";
}
// Filter required to only include properties that actually exist
if (result.required && Array.isArray(result.required) && result.properties) {
const validProps = new Set(Object.keys(result.properties));
result.required = result.required.filter((r: string) => validProps.has(r));
if (result.required.length === 0) delete result.required;
}
return result;
}
function anthropicToGeminiFunctions(tools: any[]): FunctionDeclaration[] {
return tools.map((t: any) => ({
name: t.name,
description: typeof t.description === "string" ? t.description.slice(0, 4096) : "",
parameters: sanitizeSchemaForGemini(t.input_schema || {}),
}));
}
// Tool use IDs are per-request UUIDs (no shared state between concurrent requests)
async function handleProxyGemini(
res: http.ServerResponse,
model: string,
messages: any[],
system: any,
tools: any[] | undefined,
max_tokens: number,
temperature: number | undefined,
storeId: string | undefined,
corsHeaders: Record<string, string>,
): Promise<void> {
const creds = await resolveProviderCredentials("google", storeId);
if (!creds.google) {
jsonResponse(res, 422, { error: "Google AI credentials not configured. Add google_ai_api_key to platform_secrets." }, corsHeaders);
return;
}
const client = new GoogleGenAI({ apiKey: creds.google.apiKey });
// Extract system prompt text
let systemText = "";
if (typeof system === "string") {
systemText = system;
} else if (Array.isArray(system)) {
systemText = system.map((s: any) => s.text || "").join("\n");
}
// Build generation config
const genConfig: Record<string, unknown> = { maxOutputTokens: max_tokens };
if (temperature !== undefined) genConfig.temperature = temperature;
// Build request config for the new SDK — model is passed per-request
const requestConfig: Record<string, unknown> = {
model,
config: {
...genConfig,
...(systemText ? { systemInstruction: systemText } : {}),
...(tools?.length ? {
tools: [{ functionDeclarations: anthropicToGeminiFunctions(tools) }],
toolConfig: { functionCallingConfig: { mode: "AUTO" } },
} : {}),
},
};
// Use requestConfig.model when calling generateContentStream below
const genModel = client.models;
// Convert messages
const contents = anthropicToGeminiMessages(messages);
// Start streaming — X-Accel-Buffering prevents reverse proxy buffering
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
...corsHeaders,
});
res.flushHeaders();
// Track tokens outside try so catch block can reference them
let totalOutputTokens = 0;
let totalInputTokens = 0;
let totalThinkingTokens = 0;
let totalCacheReadTokens = 0;
try {
// Request with includeThoughts so we stream Gemini's thinking process
const streamResult = await genModel.generateContentStream({
...requestConfig,
contents,
config: {
...(requestConfig.config as Record<string, unknown>),
thinkingConfig: { includeThoughts: true },
},
} as any);
// Emit Anthropic-format message_start
const msgStartData = `data: ${JSON.stringify({
type: "message_start",
message: {
id: `msg_gemini_${randomUUID().slice(0, 8)}`,
type: "message",
role: "assistant",
content: [],
model,
stop_reason: null,
usage: { input_tokens: 0, output_tokens: 0 },
},
})}\n\n`;
res.write(msgStartData);
let blockIndex = 0;
let hasOpenTextBlock = false;
let hasOpenThinkingBlock = false;
let hasToolUse = false;
let hasEmittedContent = false;
// @google/genai SDK sends DELTA text per chunk (not accumulated).
// Each chunk's part.text is only the new portion — emit it directly.
const seenFunctionCalls = new Set<string>(); // deduplicate function calls
for await (const chunk of streamResult) {
const candidates = chunk.candidates || [];
const usage = (chunk as any).usageMetadata;
if (usage) {
totalOutputTokens = usage.candidatesTokenCount || usage.totalTokenCount || 0;
totalInputTokens = usage.promptTokenCount || 0;
totalThinkingTokens = usage.thoughtsTokenCount || 0;
if (usage.cachedContentTokenCount) {
totalCacheReadTokens = usage.cachedContentTokenCount;
}
}
for (const candidate of candidates) {
const allParts = candidate.content?.parts || [];
for (const part of allParts) {
const isThought = !!(part as any).thought;
// Thinking text delta
if (isThought && part.text) {
if (hasOpenTextBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
hasOpenTextBlock = false;
}
if (!hasOpenThinkingBlock) {
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: blockIndex,
content_block: { type: "thinking", thinking: "" },
})}\n\n`);
hasOpenThinkingBlock = true;
hasEmittedContent = true;
}
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: blockIndex,
delta: { type: "thinking_delta", thinking: part.text },
})}\n\n`);
continue;
}
// Response text delta
if (part.text && !isThought) {
if (hasOpenThinkingBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
hasOpenThinkingBlock = false;
}
if (!hasOpenTextBlock) {
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: blockIndex,
content_block: { type: "text", text: "" },
})}\n\n`);
hasOpenTextBlock = true;
hasEmittedContent = true;
}
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: blockIndex,
delta: { type: "text_delta", text: part.text },
})}\n\n`);
continue;
}
// Function call
if (part.functionCall) {
const fc = {
name: part.functionCall.name || "",
args: (part.functionCall.args as Record<string, unknown>) || {},
thoughtSignature: (part as any).thoughtSignature,
};
const fcKey = `${fc.name}:${JSON.stringify(fc.args)}`;
if (seenFunctionCalls.has(fcKey)) continue;
seenFunctionCalls.add(fcKey);
// Close any open thinking block
if (hasOpenThinkingBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
hasOpenThinkingBlock = false;
}
// Close any open text block
if (hasOpenTextBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
hasOpenTextBlock = false;
}
hasToolUse = true;
hasEmittedContent = true;
const toolUseId = `toolu_gemini_${randomUUID().slice(0, 12)}`;
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: blockIndex,
content_block: {
type: "tool_use",
id: toolUseId,
name: fc.name,
input: {},
// Preserve Gemini thought_signature — required for tool call round-trips with thinking
...(fc.thoughtSignature ? { thought_signature: fc.thoughtSignature } : {}),
},
})}\n\n`);
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: blockIndex,
delta: {
type: "input_json_delta",
partial_json: JSON.stringify(fc.args || {}),
},
})}\n\n`);
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
} // end if (part.functionCall)
} // end for (part of allParts)
} // end for (candidate of candidates)
} // end for await (chunk of streamResult)
// Close any open blocks — no signature_delta for Gemini thinking
if (hasOpenThinkingBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
}
if (hasOpenTextBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
}
// Handle empty response — Gemini sometimes returns finishReason: STOP with no content
if (!hasEmittedContent) {
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: 0,
content_block: { type: "text", text: "" },
})}\n\n`);
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: 0,
delta: { type: "text_delta", text: "[Gemini returned an empty response. Please try again.]" },
})}\n\n`);
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: 0 })}\n\n`);
}
// Determine stop reason
const stopReason = hasToolUse ? "tool_use" : "end_turn";
// Emit message_delta with stop reason + full usage (input + output + thinking + cache)
res.write(`data: ${JSON.stringify({
type: "message_delta",
delta: { stop_reason: stopReason, stop_sequence: null },
usage: {
input_tokens: totalInputTokens,
output_tokens: totalOutputTokens,
...(totalThinkingTokens > 0 ? { thinking_tokens: totalThinkingTokens } : {}),
...(totalCacheReadTokens > 0 ? { cache_read_input_tokens: totalCacheReadTokens } : {}),
},
})}\n\n`);
// Emit message_stop
res.write(`data: ${JSON.stringify({ type: "message_stop" })}\n\n`);
res.write("data: [DONE]\n\n");
} catch (err: any) {
const errMsg = err?.message || String(err);
console.error("[gemini-proxy] Error:", errMsg);
if (res.headersSent) {
res.write(`data: ${JSON.stringify({ type: "error", error: sanitizeError(err) })}\n\n`);
// Always emit message_stop so the client doesn't hang
res.write(`data: ${JSON.stringify({
type: "message_delta",
delta: { stop_reason: "error", stop_sequence: null },
usage: {
input_tokens: totalInputTokens,
output_tokens: totalOutputTokens,
...(totalThinkingTokens > 0 ? { thinking_tokens: totalThinkingTokens } : {}),
...(totalCacheReadTokens > 0 ? { cache_read_input_tokens: totalCacheReadTokens } : {}),
},
})}\n\n`);
res.write(`data: ${JSON.stringify({ type: "message_stop" })}\n\n`);
res.write("data: [DONE]\n\n");
} else {
jsonResponse(res, 502, { error: `Gemini error: ${sanitizeError(err)}` }, corsHeaders);
return;
}
}
res.end();
}
// ============================================================================
// OPENAI PROXY — Responses API (2026) with format conversion to Anthropic SSE
// ============================================================================
/**
* Convert Anthropic messages to OpenAI Responses API input format.
*
* Key differences from Chat Completions:
* - System prompt goes to `instructions` param (not in input array)
* - tool_use blocks become top-level `function_call` items
* - tool_result blocks become top-level `function_call_output` items
* - Images use `input_image` type with data URL in `image_url`
* - call_id is used directly (no ID mapping needed)
*/
function anthropicToResponsesInput(messages: any[]): Array<Record<string, unknown>> {
// Pass 1: collect all tool_use call_ids so we can match function_call_output items
const knownCallIds = new Set<string>();
for (const msg of messages) {
if (msg.role === "assistant" && Array.isArray(msg.content)) {
for (const block of msg.content) {
if (block.type === "tool_use" && block.id && block.name) {
knownCallIds.add(block.id);
}
}
}
}
// Pass 2: build input array
const input: Array<Record<string, unknown>> = [];
for (const msg of messages) {
if (msg.role === "user") {
if (typeof msg.content === "string") {
input.push({ role: "user", content: msg.content });
} else if (Array.isArray(msg.content)) {
const contentParts: Array<Record<string, unknown>> = [];
for (const block of msg.content) {
if (block.type === "tool_result" && block.tool_use_id) {
// Only emit if there's a matching function_call (prevents orphaned outputs after compaction)
if (knownCallIds.has(block.tool_use_id)) {
let output = "";
if (typeof block.content === "string") {
output = block.content;
} else if (Array.isArray(block.content)) {
output = block.content
.filter((c: any) => c.type === "text")
.map((c: any) => c.text)
.join("\n");
}
input.push({
type: "function_call_output",
call_id: block.tool_use_id,
output: output || "(no output)",
});
}
// Orphaned tool results (no matching function_call) are silently dropped
} else if (block.type === "text" && block.text) {
contentParts.push({ type: "input_text", text: block.text });
} else if (block.type === "image" && block.source?.data) {
contentParts.push({
type: "input_image",
image_url: `data:${block.source.media_type};base64,${block.source.data}`,
});
}
}
// Emit content parts as user message
if (contentParts.length === 1 && contentParts[0].type === "input_text") {
input.push({ role: "user", content: contentParts[0].text as string });
} else if (contentParts.length > 0) {
input.push({ role: "user", content: contentParts });
}
}
} else if (msg.role === "assistant") {
if (typeof msg.content === "string") {
input.push({ role: "assistant", content: msg.content });
} else if (Array.isArray(msg.content)) {
let textContent = "";
for (const block of msg.content) {
if (block.type === "text" && block.text) {
textContent += block.text;
} else if (block.type === "tool_use" && block.name) {
// Emit accumulated text as assistant message first
if (textContent) {
input.push({ role: "assistant", content: textContent });
textContent = "";
}
// Emit function_call as top-level input item
input.push({
type: "function_call",
call_id: block.id || `call_${randomUUID().slice(0, 12)}`,
name: block.name,
arguments: JSON.stringify(block.input || {}),
});
}
// Skip thinking/compaction blocks — not relevant for OpenAI
}
if (textContent) {
input.push({ role: "assistant", content: textContent });
}
}
}
}
return input;
}
/** Sanitize JSON Schema for OpenAI — ensure arrays have items, objects have properties */
function sanitizeSchemaForOpenAI(schema: any): any {
if (!schema || typeof schema !== "object") return schema;
if (Array.isArray(schema)) return schema.map(sanitizeSchemaForOpenAI);
const result: any = {};
for (const [key, value] of Object.entries(schema)) {
if (key === "properties" && typeof value === "object" && value !== null && !Array.isArray(value)) {
const props: any = {};
for (const [propName, propSchema] of Object.entries(value as Record<string, unknown>)) {
props[propName] = sanitizeSchemaForOpenAI(propSchema);
}
result[key] = props;
} else {
result[key] = sanitizeSchemaForOpenAI(value);
}
}
// OpenAI requires arrays to have items
if (result.type === "array" && !result.items) {
result.items = { type: "string" };
}
// Filter required to only include defined properties
if (result.required && Array.isArray(result.required) && result.properties) {
const validProps = new Set(Object.keys(result.properties));
result.required = result.required.filter((r: string) => validProps.has(r));
if (result.required.length === 0) delete result.required;
}
return result;
}
/** Convert Anthropic tools to OpenAI Responses API FunctionTool format */
function anthropicToResponsesTools(tools: any[]): Array<Record<string, unknown>> {
return tools.map((t: any) => ({
type: "function",
name: t.name,
description: typeof t.description === "string" ? t.description.slice(0, 4096) : "",
parameters: sanitizeSchemaForOpenAI(t.input_schema || {}),
strict: false,
}));
}
async function handleProxyOpenAI(
res: http.ServerResponse,
model: string,
messages: any[],
system: any,
tools: any[] | undefined,
max_tokens: number,
temperature: number | undefined,
storeId: string | undefined,
corsHeaders: Record<string, string>,
): Promise<void> {
const creds = await resolveProviderCredentials("openai", storeId);
if (!creds.openai) {
jsonResponse(res, 422, { error: "OpenAI credentials not configured. Add OPENAI_API_KEY to user_tool_secrets." }, corsHeaders);
return;
}
const client = new OpenAI({ apiKey: creds.openai.apiKey });
const isReasoning = isOpenAIReasoningModel(model);
// Convert Anthropic format → Responses API format
const openaiInput = anthropicToResponsesInput(messages);
const openaiTools = tools?.length ? anthropicToResponsesTools(tools) : undefined;
// Extract system prompt → instructions parameter
let instructions = "";
if (typeof system === "string") {
instructions = system;
} else if (Array.isArray(system)) {
instructions = system.map((s: any) => s.text || "").join("\n");
}
// Build Responses API request
const params: Record<string, unknown> = {
model,
input: openaiInput,
stream: true,
store: false,
max_output_tokens: max_tokens,
parallel_tool_calls: true,
truncation: "auto",
};
if (instructions) params.instructions = instructions;
if (openaiTools?.length) params.tools = openaiTools;
if (isReasoning) {
// Reasoning models: use reasoning config, no temperature
params.reasoning = { effort: "high", summary: "detailed" };
} else {
if (temperature !== undefined) params.temperature = temperature;
}
// Start SSE streaming
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
...corsHeaders,
});
res.flushHeaders();
let totalInputTokens = 0;
let totalOutputTokens = 0;
let totalReasoningTokens = 0;
let totalCacheReadTokens = 0;
try {
// Stream returns APIPromise<Stream<ResponseStreamEvent>> when stream: true
// Cast through unknown because TS can't resolve the streaming overload from `params as any`
const stream = await (client.responses.create(params as any) as unknown as Promise<AsyncIterable<any>>);
// Emit Anthropic-format message_start
res.write(`data: ${JSON.stringify({
type: "message_start",
message: {
id: `msg_openai_${randomUUID().slice(0, 8)}`,
type: "message",
role: "assistant",
content: [],
model,
stop_reason: null,
usage: { input_tokens: 0, output_tokens: 0 },
},
})}\n\n`);
let blockIndex = 0;
let hasOpenTextBlock = false;
let hasOpenThinkingBlock = false;
let hasToolUse = false;
let hasEmittedContent = false;
// Track function call metadata from output_item.added (name/call_id not always on arguments.done)
const pendingFunctionCalls = new Map<string, { name: string; callId: string }>(); // item_id → {name, callId}
// Helper to close any open content blocks
const closeOpenBlocks = () => {
if (hasOpenThinkingBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
hasOpenThinkingBlock = false;
}
if (hasOpenTextBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
hasOpenTextBlock = false;
}
};
for await (const event of stream as AsyncIterable<any>) {
switch (event.type) {
// ---- Reasoning summary → Anthropic thinking block ----
case "response.reasoning_summary_text.delta": {
const text = event.delta;
if (!text) break;
// Close text block if open (reasoning comes before text)
if (hasOpenTextBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
hasOpenTextBlock = false;
}
if (!hasOpenThinkingBlock) {
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: blockIndex,
content_block: { type: "thinking", thinking: "" },
})}\n\n`);
hasOpenThinkingBlock = true;
hasEmittedContent = true;
}
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: blockIndex,
delta: { type: "thinking_delta", thinking: text },
})}\n\n`);
break;
}
case "response.reasoning_summary_text.done": {
// Do NOT close thinking block here — multiple reasoning summary parts
// may arrive. Let closeOpenBlocks() handle it when text or tool use starts.
break;
}
// ---- Text content → Anthropic text block ----
case "response.output_text.delta": {
const text = event.delta;
if (!text) break;
// Close thinking block if open (text comes after reasoning)
if (hasOpenThinkingBlock) {
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
hasOpenThinkingBlock = false;
}
if (!hasOpenTextBlock) {
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: blockIndex,
content_block: { type: "text", text: "" },
})}\n\n`);
hasOpenTextBlock = true;
hasEmittedContent = true;
}
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: blockIndex,
delta: { type: "text_delta", text },
})}\n\n`);
break;
}
case "response.output_text.done": {
// Do NOT close the text block here — OpenAI may emit multiple text
// content parts (e.g., multi-part output items). Closing prematurely
// creates a "double response" with two separate text blocks.
// The text block will be closed by closeOpenBlocks() when a tool call
// arrives or the stream ends.
break;
}
// ---- Function calls → Anthropic tool_use blocks ----
// Capture function name + call_id from output_item.added
case "response.output_item.added": {
const item = event.item;
if (item?.type === "function_call" && item.id && item.name) {
pendingFunctionCalls.set(item.id, { name: item.name, callId: item.call_id || item.id });
}
break;
}
case "response.function_call_arguments.done": {
closeOpenBlocks();
hasToolUse = true;
hasEmittedContent = true;
// Match via item_id (arguments.done uses item_id to reference the output item)
const pending = pendingFunctionCalls.get(event.item_id);
const name = pending?.name || event.name || "unknown_tool";
const callId = pending?.callId || event.call_id || event.item_id;
if (pending) pendingFunctionCalls.delete(event.item_id);
const args = event.arguments;
let parsedInput = {};
try { parsedInput = JSON.parse(args || "{}"); } catch { /* use empty */ }
// Use OpenAI's call_id directly as tool_use id for round-trip consistency
const toolUseId = callId || `toolu_openai_${randomUUID().slice(0, 12)}`;
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: blockIndex,
content_block: {
type: "tool_use",
id: toolUseId,
name,
input: {},
},
})}\n\n`);
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: blockIndex,
delta: {
type: "input_json_delta",
partial_json: JSON.stringify(parsedInput),
},
})}\n\n`);
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
blockIndex++;
break;
}
// ---- Completion → usage + stop ----
case "response.completed": {
const response = event.response;
if (response?.usage) {
totalInputTokens = response.usage.input_tokens || 0;
totalOutputTokens = response.usage.output_tokens || 0;
if (response.usage.output_tokens_details?.reasoning_tokens) {
totalReasoningTokens = response.usage.output_tokens_details.reasoning_tokens;
}
if (response.usage.input_tokens_details?.cached_tokens) {
totalCacheReadTokens = response.usage.input_tokens_details.cached_tokens;
}
}
break;
}
// ---- Error during streaming ----
case "response.failed": {
const error = event.response?.error;
const errMsg = error?.message || "Unknown OpenAI error";
console.error("[openai-proxy] Response failed:", errMsg);
closeOpenBlocks();
if (!hasEmittedContent) {
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: blockIndex,
content_block: { type: "text", text: "" },
})}\n\n`);
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: blockIndex,
delta: { type: "text_delta", text: `[OpenAI error: ${errMsg}]` },
})}\n\n`);
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: blockIndex })}\n\n`);
hasEmittedContent = true;
}
break;
}
case "response.incomplete": {
console.warn("[openai-proxy] Response incomplete:", event.response?.incomplete_details);
break;
}
// All other events (output_item.added/done, content_part.*, etc.) — skip
default:
break;
}
}
// Close any remaining open blocks
closeOpenBlocks();
// Handle empty response
if (!hasEmittedContent) {
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: 0,
content_block: { type: "text", text: "" },
})}\n\n`);
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: 0,
delta: { type: "text_delta", text: "[OpenAI returned an empty response. Please try again.]" },
})}\n\n`);
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: 0 })}\n\n`);
}
// Determine stop reason
const stopReason = hasToolUse ? "tool_use" : "end_turn";
// Emit message_delta with usage
res.write(`data: ${JSON.stringify({
type: "message_delta",
delta: { stop_reason: stopReason, stop_sequence: null },
usage: {
input_tokens: totalInputTokens,
output_tokens: totalOutputTokens,
...(totalReasoningTokens > 0 ? { thinking_tokens: totalReasoningTokens } : {}),
...(totalCacheReadTokens > 0 ? { cache_read_input_tokens: totalCacheReadTokens } : {}),
},
})}\n\n`);
res.write(`data: ${JSON.stringify({ type: "message_stop" })}\n\n`);
res.write("data: [DONE]\n\n");
} catch (err: any) {
const errMsg = err?.message || String(err);
console.error("[openai-proxy] Error:", errMsg);
if (res.headersSent) {
res.write(`data: ${JSON.stringify({ type: "error", error: sanitizeError(err) })}\n\n`);
res.write(`data: ${JSON.stringify({
type: "message_delta",
delta: { stop_reason: "error", stop_sequence: null },
usage: {
input_tokens: totalInputTokens,
output_tokens: totalOutputTokens,
...(totalReasoningTokens > 0 ? { thinking_tokens: totalReasoningTokens } : {}),
...(totalCacheReadTokens > 0 ? { cache_read_input_tokens: totalCacheReadTokens } : {}),
},
})}\n\n`);
res.write(`data: ${JSON.stringify({ type: "message_stop" })}\n\n`);
res.write("data: [DONE]\n\n");
} else {
jsonResponse(res, 502, { error: `OpenAI error: ${sanitizeError(err)}` }, corsHeaders);
return;
}
}
res.end();
}