// server/index.ts — Unified Node.js agent server for Fly.io
// All CLI optimizations: prompt caching, retry, loop detection, parallel tools,
// model-aware context management, cost tracking, compaction block handling
//
// Shares agent-core with CLI via src/shared/agent-core.ts
import http from "node:http";
import { randomUUID, timingSafeEqual, createHash } from "node:crypto";
import { createClient, type SupabaseClient } from "@supabase/supabase-js";
import Anthropic from "@anthropic-ai/sdk";
import {
getMaxOutputTokens,
getMaxToolResultChars,
sanitizeError,
} from "../shared/agent-core.js";
import { handleProxy } from "./proxy-handlers.js";
import { processWorkflowSteps, processWaitingSteps, handleWebhookIngestion, executeInlineChain, setToolExecutor, setAgentExecutor, setTokenBroadcaster, verifyGuestApprovalSignature, initWorkerPool, getPoolStats, shutdownPool, processScheduleTriggers, enforceWorkflowTimeouts, processEventTriggers } from "./handlers/workflows.js";
import { runServerAgentLoop } from "./lib/server-agent-loop.js";
import {
loadTools,
loadUserTools,
getToolsForAgent,
executeTool,
loadAgentConfig,
type ToolDef,
type UserToolRow,
type AgentConfig,
} from "./tool-router.js";
import pg from "pg";
// ============================================================================
// PROCESS ERROR HANDLERS
// ============================================================================
process.on("unhandledRejection", (reason, _promise) => {
console.error("[server] Unhandled rejection:", reason);
});
process.on("uncaughtException", (err) => {
console.error("[server] Uncaught exception:", err);
process.exit(1);
});
// ============================================================================
// ENV CONFIG
// ============================================================================
const PORT = parseInt(process.env.PORT || "8080", 10);
const SUPABASE_URL = process.env.SUPABASE_URL!;
const SUPABASE_SERVICE_ROLE_KEY = process.env.SUPABASE_SERVICE_ROLE_KEY!;
const SERVICE_ROLE_JWT = process.env.SERVICE_ROLE_JWT || "";
const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY!;
const ALLOWED_ORIGINS = (process.env.ALLOWED_ORIGINS || "http://localhost:3000,http://127.0.0.1:3000").split(",").map(s => s.trim());
const FLY_INTERNAL_SECRET = process.env.FLY_INTERNAL_SECRET || "";
// ============================================================================
// READINESS STATE
// ============================================================================
let pgListenReady = false;
let workerPoolReady = false;
function isReady(): boolean {
return workerPoolReady; // PG listen is optional (SSE only)
}
// ============================================================================
// RATE LIMITING (unauthenticated endpoints)
// ============================================================================
const rateLimitMap = new Map<string, { count: number; resetAt: number }>();
const RATE_LIMIT_WINDOW_MS = 60_000; // 1 minute
const RATE_LIMIT_MAX = 30; // 30 requests per minute per IP
function checkRateLimit(ip: string): boolean {
const now = Date.now();
const entry = rateLimitMap.get(ip);
if (!entry || now > entry.resetAt) {
rateLimitMap.set(ip, { count: 1, resetAt: now + RATE_LIMIT_WINDOW_MS });
return true;
}
entry.count++;
return entry.count <= RATE_LIMIT_MAX;
}
// Cleanup expired entries every 5 minutes
setInterval(() => {
const now = Date.now();
for (const [ip, entry] of rateLimitMap) {
if (now > entry.resetAt) rateLimitMap.delete(ip);
}
}, 300_000).unref();
// Timing-safe secret comparison to prevent timing attacks
// Hash both values to fixed length before comparing — avoids leaking secret length
function safeCompare(a: string, b: string): boolean {
if (!a || !b) return false;
const hashA = createHash("sha256").update(a).digest();
const hashB = createHash("sha256").update(b).digest();
return timingSafeEqual(hashA, hashB);
}
// ============================================================================
// TYPES
// ============================================================================
interface StreamEvent {
type: "text" | "tool_start" | "tool_result" | "error" | "done" | "usage" | "subagent";
text?: string;
name?: string;
result?: unknown;
success?: boolean;
error?: string;
usage?: {
input_tokens: number;
output_tokens: number;
cache_creation_tokens?: number;
cache_read_tokens?: number;
cost_usd?: number;
};
conversationId?: string;
subagentId?: string;
subagentEvent?: string;
}
// Tool registry, user tools, executor, and agent loader are in ./tool-router.ts
// ============================================================================
// CORS
// ============================================================================
function getCorsHeaders(origin?: string): Record<string, string> {
const headers: Record<string, string> = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "0",
"Referrer-Policy": "strict-origin-when-cross-origin",
};
if (ALLOWED_ORIGINS.includes("*")) {
headers["Access-Control-Allow-Origin"] = "*";
} else if (origin && ALLOWED_ORIGINS.includes(origin)) {
headers["Access-Control-Allow-Origin"] = origin;
headers["Vary"] = "Origin";
}
// If origin doesn't match, no CORS header = browser blocks the request
headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS";
headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Store-Id";
return headers;
}
// ============================================================================
// PHASE 3: SSE STREAMING — real-time workflow run progress
// ============================================================================
// Map<runId, Set<ServerResponse>> for multiplexing SSE clients
const sseClients = new Map<string, Set<http.ServerResponse>>();
const MAX_SSE_CLIENTS_PER_RUN = 10;
const MAX_SSE_TOTAL_CLIENTS = 100;
function sendWorkflowSSE(res: http.ServerResponse, data: unknown): void {
try { res.write(`data: ${JSON.stringify(data)}\n\n`); } catch { /* client disconnected — benign */ }
}
function broadcastToRun(runId: string, data: unknown): void {
const clients = sseClients.get(runId);
if (!clients?.size) return;
// H6 FIX: Prune dead connections during broadcast
for (const res of clients) {
if (res.destroyed || res.writableEnded) {
clients.delete(res);
continue;
}
sendWorkflowSSE(res, data);
}
if (clients.size === 0) sseClients.delete(runId);
}
function getTotalSseClients(): number {
let total = 0;
for (const clients of sseClients.values()) total += clients.size;
return total;
}
// H6 FIX: Periodic stale connection cleanup (every 60s)
const sseCleanupInterval = setInterval(() => {
for (const [rid, clients] of sseClients) {
for (const res of clients) {
if (res.destroyed || res.writableEnded) {
clients.delete(res);
}
}
if (clients.size === 0) sseClients.delete(rid);
}
}, 60_000);
// pg LISTEN for real-time notifications
const DATABASE_URL = process.env.DATABASE_URL || "";
let pgClient: pg.Client | null = null;
let pgReconnectAttempts = 0;
const MAX_PG_RECONNECT_DELAY = 60_000; // 60s max
async function setupPgListen(): Promise<void> {
if (!DATABASE_URL) {
console.log("[pg-listen] DATABASE_URL not set — SSE streaming disabled, using worker-only mode");
return;
}
try {
// Strip sslmode from URL (pg v8 treats sslmode=require as verify-full) and set ssl manually
const cleanUrl = DATABASE_URL.replace(/[?&]sslmode=[^&]*/g, "").replace(/\?$/, "");
pgClient = new pg.Client({ connectionString: cleanUrl, ssl: { rejectUnauthorized: false } });
await pgClient.connect();
await pgClient.query("LISTEN workflow_step_event");
await pgClient.query("LISTEN workflow_run_event");
await pgClient.query("LISTEN workflow_step_pending");
await pgClient.query("LISTEN workflow_event");
await pgClient.query("LISTEN automation_event");
// Reset reconnect counter on successful connection
pgReconnectAttempts = 0;
pgListenReady = true;
// Debounced event trigger processing — fires at most once per 100ms
let eventTriggerTimer: ReturnType<typeof setTimeout> | null = null;
function debouncedEventProcess(): void {
if (eventTriggerTimer) return;
eventTriggerTimer = setTimeout(async () => {
eventTriggerTimer = null;
try {
const sb = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
const count = await processEventTriggers(sb);
if (count > 0) console.log(`[pg-listen] Instant event processing: ${count} events`);
} catch (err) { console.error("[pg-listen] Event trigger processing error:", (err as Error).message); }
}, 100);
}
pgClient.on("notification", (msg) => {
if (!msg.payload) return;
try {
const data = JSON.parse(msg.payload);
// Automation event — trigger immediate processing
if (msg.channel === "automation_event") {
debouncedEventProcess();
return;
}
const runId = data.run_id;
if (!runId) return;
if (msg.channel === "workflow_step_event") {
broadcastToRun(runId, { type: "step_update", ...data });
} else if (msg.channel === "workflow_run_event") {
broadcastToRun(runId, { type: "run_update", ...data });
} else if (msg.channel === "workflow_event") {
broadcastToRun(runId, { type: "event", event_type: data.event_type, ...data });
}
// workflow_step_pending — could trigger inline execution here too
} catch (err) { console.error("[pg-listen] Failed to parse notification:", (err as Error).message); }
});
pgClient.on("error", (err) => {
console.error("[pg-listen] Connection error:", err.message);
pgClient = null;
// Reconnect with exponential backoff
pgReconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, pgReconnectAttempts - 1), MAX_PG_RECONNECT_DELAY);
console.log(`[pg-listen] Reconnecting in ${delay}ms (attempt ${pgReconnectAttempts})...`);
setTimeout(() => setupPgListen(), delay);
});
console.log("[pg-listen] Listening on workflow_step_event, workflow_run_event, workflow_step_pending, workflow_event, automation_event");
} catch (err) {
console.error("[pg-listen] Failed to connect:", (err as Error).message);
pgClient = null;
// Reconnect with exponential backoff on initial connection failure too
pgReconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, pgReconnectAttempts - 1), MAX_PG_RECONNECT_DELAY);
console.log(`[pg-listen] Reconnecting in ${delay}ms (attempt ${pgReconnectAttempts})...`);
setTimeout(() => setupPgListen(), delay);
}
}
// ============================================================================
// HELPERS
// ============================================================================
function getAnthropicClient(agent: AgentConfig): Anthropic {
const key = agent.api_key || ANTHROPIC_API_KEY;
return new Anthropic({ apiKey: key });
}
function sendSSE(res: http.ServerResponse, event: StreamEvent): void {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
function jsonResponse(res: http.ServerResponse, status: number, data: unknown, corsHeaders: Record<string, string>): void {
res.writeHead(status, { "Content-Type": "application/json", ...corsHeaders });
res.end(JSON.stringify(data));
}
async function readBody(req: http.IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let size = 0;
req.on("data", (chunk: Buffer) => {
size += chunk.length;
if (size > 1_048_576) {
req.destroy();
reject(new Error("Request body too large"));
return;
}
chunks.push(chunk);
});
req.on("end", () => resolve(Buffer.concat(chunks).toString("utf8")));
req.on("error", reject);
});
}
// ============================================================================
// HISTORY COMPACTION
// ============================================================================
function compactHistory(
history: Anthropic.MessageParam[],
maxHistoryChars: number,
maxMessageChars: number,
maxToolResultChars: number
): Anthropic.MessageParam[] {
if (!history?.length) return [];
let totalChars = 0;
const compacted: Anthropic.MessageParam[] = [];
for (let i = history.length - 1; i >= 0; i--) {
const msg = history[i];
let content = msg.content;
if (typeof content === "string") {
if (content.length > maxMessageChars) content = content.substring(0, maxMessageChars) + "\n...[truncated]";
} else if (Array.isArray(content)) {
// Anthropic SDK ContentBlockParam is a complex union — use targeted narrowing
content = content.map((block) => {
if (block.type === "text" && "text" in block && typeof block.text === "string" && block.text.length > maxMessageChars) {
return { ...block, text: block.text.substring(0, maxMessageChars) + "\n...[truncated]" } as typeof block;
}
if (block.type === "tool_result" && "content" in block && typeof block.content === "string" && block.content.length > maxToolResultChars) {
return { ...block, content: block.content.substring(0, maxToolResultChars) + "\n...[truncated]" } as typeof block;
}
return block;
});
}
const msgChars = JSON.stringify(content).length;
if (totalChars + msgChars > maxHistoryChars) break;
totalChars += msgChars;
compacted.unshift({ ...msg, content });
}
// Ensure starts with user message
while (compacted.length > 0 && compacted[0].role !== "user") compacted.shift();
return compacted;
}
// ============================================================================
// AGENT CHAT HANDLER
// ============================================================================
async function handleAgentChat(
req: http.IncomingMessage,
res: http.ServerResponse,
supabase: SupabaseClient,
body: any,
user: { id: string; email?: string } | null,
isServiceRole: boolean,
token: string,
corsHeaders: Record<string, string>
): Promise<void> {
const { agentId, message, conversationHistory, source, conversationId, context } = body;
let storeId: string | undefined = body.storeId;
if (!agentId || !message) {
jsonResponse(res, 400, { error: "agentId and message required" }, corsHeaders);
return;
}
if (typeof message === "string" && message.length > 100_000) {
jsonResponse(res, 400, { error: "Message too long (max 100K characters)" }, corsHeaders);
return;
}
// Fallback: resolve user's store when storeId not provided in request
if (!storeId && user?.id && !isServiceRole) {
try {
const { data: userStores } = await supabase
.from("user_stores")
.select("store_id")
.eq("user_id", user.id)
.limit(1);
if (userStores?.length) {
storeId = userStores[0].store_id;
console.log(`[store-resolution] Resolved user ${user.id} → store ${storeId}`);
}
} catch (err) {
console.error("[store-resolution] Error:", err);
}
}
console.log(`[agent-chat] storeId=${storeId || "NONE"} source=${body.source || "unknown"} isServiceRole=${isServiceRole} userId=${user?.id || body.userId || "NONE"}`);
// Fallback: resolve store from body.userId for service-role requests (e.g. WhaleChat app)
if (!storeId && !user?.id && body.userId && isServiceRole) {
try {
const { data: userStores } = await supabase
.from("user_stores")
.select("store_id")
.eq("user_id", body.userId)
.limit(1);
if (userStores?.length) {
storeId = userStores[0].store_id;
console.log(`[store-resolution] Resolved userId ${body.userId} → store ${storeId}`);
}
} catch (err) {
console.error("[store-resolution] Error:", err);
}
}
// Verify store access (skip for service_role)
if (storeId && !isServiceRole) {
const userClient = createClient(SUPABASE_URL, process.env.SUPABASE_ANON_KEY || "", {
global: { headers: { Authorization: `Bearer ${token}` } },
});
const { data: storeAccess, error: storeErr } = await userClient
.from("stores").select("id").eq("id", storeId).limit(1);
if (storeErr || !storeAccess?.length) {
jsonResponse(res, 403, { error: "Access denied to store" }, corsHeaders);
return;
}
}
const userId: string = user?.id || body.userId || "";
const userEmail: string | null = user?.email || body.userEmail || null;
const agent = await loadAgentConfig(supabase, agentId);
if (!agent) {
jsonResponse(res, 404, { error: "Agent not found" }, corsHeaders);
return;
}
const allTools = await loadTools(supabase);
const { rows: userToolRows, defs: userToolDefs } = storeId
? await loadUserTools(supabase, storeId)
: { rows: [] as UserToolRow[], defs: [] as ToolDef[] };
const tools = getToolsForAgent(agent, allTools, userToolDefs);
const traceId = randomUUID();
const agentModel = agent.model || "claude-sonnet-4-20250514";
// Resolve or create conversation
let activeConversationId: string;
if (conversationId) {
activeConversationId = conversationId;
} else {
let conv = await supabase
.from("ai_conversations")
.insert({
store_id: storeId || null,
user_id: userId || null,
agent_id: agentId,
title: message.substring(0, 100),
metadata: { agentName: agent.name, source: source || "whale_chat" },
})
.select("id")
.single();
if (conv.error) {
console.error("[conversation] create failed:", conv.error.message, conv.error.details, conv.error.hint, JSON.stringify({ store_id: storeId, user_id: userId, agent_id: agentId }));
// Retry without user_id (may be FK constraint)
conv = await supabase
.from("ai_conversations")
.insert({
store_id: storeId || null,
agent_id: agentId,
title: message.substring(0, 100),
metadata: { agentName: agent.name, source: source || "whale_chat", userId, userEmail },
})
.select("id")
.single();
if (conv.error) {
console.error("[conversation] retry create failed:", conv.error.message, conv.error.details, conv.error.hint);
}
}
activeConversationId = conv.data?.id || randomUUID();
console.log(`[conversation] id=${activeConversationId} fromDb=${!!conv.data?.id}`);
}
// Build system prompt
let systemPrompt = agent.system_prompt || "You are a helpful assistant.";
if (storeId) systemPrompt += `\n\nYou are operating for store_id: ${storeId}. Always include this in tool calls that require it.`;
if (!agent.can_modify) systemPrompt += "\n\nIMPORTANT: You have read-only access. Do not attempt to modify any data.";
if (agent.tone && agent.tone !== "professional") systemPrompt += `\n\nTone: Respond in a ${agent.tone} tone.`;
if (agent.verbosity === "concise") systemPrompt += "\n\nBe concise — short answers, minimal explanation.";
else if (agent.verbosity === "verbose") systemPrompt += "\n\nBe thorough — provide detailed answers with full context.";
if (agent.context_config) {
const ctx = agent.context_config;
if (ctx.includeLocations && ctx.locationIds?.length) systemPrompt += `\n\nFocus on these locations: ${ctx.locationIds.join(", ")}`;
if (ctx.includeCustomers && ctx.customerSegments?.length) systemPrompt += `\n\nFocus on these customer segments: ${ctx.customerSegments.join(", ")}`;
}
// Inject runtime context from client (location, user, channel info)
if (context && typeof context === "object") {
const ctxParts: string[] = [];
if (context.storeName) ctxParts.push(`Store: ${context.storeName}`);
if (context.locationName) {
let loc = `Location: ${context.locationName}`;
if (context.locationAddress) loc += ` (${context.locationAddress})`;
if (context.locationType) loc += ` [${context.locationType}]`;
ctxParts.push(loc);
}
if (context.userName) {
ctxParts.push(`User: ${context.userName}${userEmail ? ` (${userEmail})` : ""}`);
} else if (userEmail) {
ctxParts.push(`User: ${userEmail}`);
}
if (context.conversationType) {
ctxParts.push(`Channel: ${context.conversationTitle || context.conversationType} (${context.conversationType})`);
}
if (ctxParts.length) {
systemPrompt += `\n\n## Current Session Context\n${ctxParts.join("\n")}`;
}
}
const anthropic = getAnthropicClient(agent);
// Memory recall — inject relevant memories into system prompt
if (storeId) {
try {
const { data: memories } = await supabase.rpc("recall_memory", {
p_agent_id: agentId,
p_query: message.substring(0, 200),
p_type: null,
p_limit: 10,
});
if (memories?.length) {
const memBlock = memories.map((m: any) =>
`- [${m.memory_type}] ${m.key}: ${JSON.stringify(m.value)}`
).join("\n");
systemPrompt += `\n\n## Agent Memory\nRelevant memories from previous conversations:\n${memBlock}`;
}
} catch (err) {
console.error("[memory] recall failed:", (err as Error).message);
}
}
const ctxCfg = agent.context_config;
const MAX_HISTORY_CHARS = ctxCfg?.max_history_chars || 400_000;
const MAX_MESSAGE_CHARS = ctxCfg?.max_message_chars || 20_000;
const maxToolResultChars = getMaxToolResultChars(ctxCfg);
const messages: Anthropic.MessageParam[] = [
...compactHistory(conversationHistory || [], MAX_HISTORY_CHARS, MAX_MESSAGE_CHARS, maxToolResultChars),
{ role: "user", content: message },
];
// Persist user message + audit
try {
const { error: msgErr } = await supabase.from("ai_messages").insert({
conversation_id: activeConversationId,
role: "user",
content: [{ type: "text", text: message }],
});
if (msgErr) console.error("[message] user insert failed:", msgErr.message, msgErr.details, msgErr.hint);
} catch (err) {
console.error("[message] exception:", err);
}
try {
const { error: auditErr } = await supabase.from("audit_logs").insert({
action: "chat.user_message",
severity: "info",
store_id: storeId || null,
resource_type: "chat_message",
resource_id: agentId,
request_id: traceId,
conversation_id: activeConversationId,
user_id: userId || null,
user_email: userEmail || null,
source: source || "whale_chat",
details: {
message_preview: message.substring(0, 200),
agent_id: agentId,
model: agentModel,
conversation_id: activeConversationId,
history_length: conversationHistory?.length || 0,
},
});
if (auditErr) console.error("[audit] user_message insert failed:", auditErr.message, auditErr.details, auditErr.hint);
} catch (err) {
console.error("[audit] exception:", err);
}
// Start SSE stream
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
...corsHeaders,
});
// Client disconnect detection
let clientDisconnected = false;
req.on("close", () => { clientDisconnected = true; });
const maxDurationMs = 5 * 60 * 1000;
const startedAt = Date.now();
const chatStartTime = Date.now();
try {
const result = await runServerAgentLoop({
anthropic,
supabase,
model: agentModel,
systemPrompt,
messages,
tools,
maxTurns: agent.max_tool_calls || 10,
temperature: agent.temperature ?? 0.7,
maxTokens: getMaxOutputTokens(agentModel, agent.max_tokens),
maxToolResultChars,
storeId,
traceId,
userId,
userEmail,
source,
conversationId: activeConversationId,
agentId,
executeTool: async (toolName, args, sourceOverride) => {
const toolArgs = { ...args };
if (!toolArgs.store_id && storeId) toolArgs.store_id = storeId;
return executeTool(supabase, toolName, toolArgs, storeId, traceId,
userId, userEmail, sourceOverride || source,
activeConversationId, userToolRows, agentId);
},
onText: (text) => sendSSE(res, { type: "text", text }),
onToolStart: (name) => sendSSE(res, { type: "tool_start", name }),
onToolResult: (name, success, r) => sendSSE(res, { type: "tool_result", name, success, result: r }),
onSubagentProgress: (evt) => {
sendSSE(res, { type: "subagent", subagentId: evt.subagentId, subagentEvent: evt.event, name: evt.toolName });
},
clientDisconnected: { get value() { return clientDisconnected; } },
startedAt,
maxDurationMs,
});
// Send usage SSE
sendSSE(res, {
type: "usage",
usage: {
input_tokens: result.tokens.input,
output_tokens: result.tokens.output,
cache_creation_tokens: result.tokens.cacheCreation,
cache_read_tokens: result.tokens.cacheRead,
cost_usd: result.costUsd,
},
});
// Persist assistant message
try {
const { error: msgErr } = await supabase.from("ai_messages").insert({
conversation_id: activeConversationId,
role: "assistant",
content: [{ type: "text", text: result.finalText }],
is_tool_use: result.toolCallCount > 0,
tool_names: result.toolsUsed.length > 0 ? result.toolsUsed : null,
token_count: result.tokens.input + result.tokens.output,
});
if (msgErr) console.error("[message] assistant insert failed:", msgErr.message, msgErr.details, msgErr.hint);
} catch (err) {
console.error("[message] exception:", err);
}
// Update conversation metadata
try {
const { error: convErr } = await supabase.from("ai_conversations").update({
metadata: {
agentName: agent.name,
source: source || "whale_chat",
model: agentModel,
lastTurnTokens: result.tokens.input + result.tokens.output,
lastToolCalls: result.toolCallCount,
lastDurationMs: Date.now() - chatStartTime,
},
}).eq("id", activeConversationId);
if (convErr) console.error("[conversation] update failed:", convErr.message, convErr.details, convErr.hint);
} catch (err) {
console.error("[conversation] exception:", err);
}
// Audit log
try {
const { error: auditErr } = await supabase.from("audit_logs").insert({
action: "chat.assistant_response",
severity: "info",
store_id: storeId || null,
resource_type: "chat_message",
resource_id: agentId,
request_id: traceId,
conversation_id: activeConversationId,
duration_ms: Date.now() - chatStartTime,
user_id: userId || null,
user_email: userEmail || null,
source: source || "whale_chat",
input_tokens: result.tokens.input,
output_tokens: result.tokens.output,
total_cost: result.costUsd,
model: agentModel,
details: {
response_preview: result.finalText.substring(0, 500),
agent_id: agentId,
model: agentModel,
turn_count: result.turnCount,
tool_calls: result.toolCallCount,
tool_names: result.toolsUsed,
conversation_id: activeConversationId,
session_cost_usd: result.costUsd,
cache_creation_tokens: result.tokens.cacheCreation,
cache_read_tokens: result.tokens.cacheRead,
loop_detector_stats: result.loopDetectorStats,
},
});
if (auditErr) console.error("[audit] assistant_response insert failed:", auditErr.message, auditErr.details, auditErr.hint);
} catch (err) {
console.error("[audit] exception:", err);
}
// Memory extraction — fire-and-forget after response
if (storeId && result.finalText.length > 50) {
extractAndStoreMemories(supabase, anthropic, agentId, storeId, message, result.finalText)
.catch((err: Error) => console.error("[memory] extract failed:", err.message));
}
// Cost budget tracking — increment active budgets
if (storeId && result.costUsd > 0) {
updateCostBudgets(supabase, storeId, agentId, result.costUsd)
.catch((err: Error) => console.error("[cost] budget update failed:", err.message));
}
sendSSE(res, { type: "done", conversationId: activeConversationId });
} catch (err) {
sendSSE(res, { type: "error", error: sanitizeError(err) });
}
res.end();
}
// ============================================================================
// MEMORY EXTRACTION — extract key facts after agent conversation
// ============================================================================
async function extractAndStoreMemories(
supabase: SupabaseClient, anthropic: Anthropic,
agentId: string, storeId: string,
userMessage: string, assistantResponse: string
): Promise<void> {
const extraction = await anthropic.messages.create({
model: "claude-haiku-4-5-20251001",
max_tokens: 500,
system: `Extract key facts worth remembering from this conversation turn.
Return JSON array: [{"key": "short_key", "value": {"detail": "..."}, "type": "short_term|long_term|entity"}]
Rules:
- Only extract genuinely useful facts (preferences, decisions, corrections, entities)
- "entity" for people/businesses/products mentioned
- "long_term" for preferences, patterns, decisions
- "short_term" for context that may expire
- Return [] if nothing worth remembering
- Max 3 items per turn`,
messages: [{
role: "user",
content: `User: ${userMessage.substring(0, 500)}\n\nAssistant: ${assistantResponse.substring(0, 1000)}`
}],
});
const text = extraction.content.find(b => b.type === "text")?.text || "[]";
const match = text.match(/\[[\s\S]*\]/);
if (!match) return;
let items: any[];
try {
items = JSON.parse(match[0]);
} catch {
return; // Malformed JSON from extraction
}
for (const item of items.slice(0, 3)) {
if (!item.key) continue;
await supabase.rpc("store_memory", {
p_agent_id: agentId,
p_store_id: storeId,
p_type: item.type || "short_term",
p_key: item.key,
p_value: item.value || {},
});
}
}
// ============================================================================
// COST BUDGET TRACKING — increment active budgets after each conversation
// ============================================================================
async function updateCostBudgets(
supabase: SupabaseClient, storeId: string, agentId: string, costUsd: number
): Promise<void> {
const { data: budgets } = await supabase
.from("ai_cost_budgets")
.select("id, current_spend_usd")
.eq("store_id", storeId)
.eq("is_active", true)
.or(`agent_id.eq.${agentId},agent_id.is.null`);
if (!budgets?.length) return;
for (const budget of budgets) {
await supabase
.from("ai_cost_budgets")
.update({ current_spend_usd: (budget.current_spend_usd || 0) + costUsd })
.eq("id", budget.id);
}
}
// ============================================================================
// HTTP SERVER
// ============================================================================
const server = http.createServer(async (req, res) => {
const origin = req.headers.origin || "";
const corsHeaders = getCorsHeaders(origin);
// Health check — readiness-aware for Fly.io
if (req.method === "GET" && (req.url === "/" || req.url === "/health")) {
const ready = isReady();
const status = ready ? 200 : 503;
jsonResponse(res, status, {
status: ready ? "ok" : "starting",
version: process.env.npm_package_version || "6.0.0",
uptime: Math.floor(process.uptime()),
pg_listen: pgListenReady,
worker_pool: workerPoolReady,
}, corsHeaders);
return;
}
// CORS preflight
if (req.method === "OPTIONS") {
res.writeHead(204, corsHeaders);
res.end();
return;
}
const url = new URL(req.url || "/", `http://${req.headers.host || "localhost"}`);
const pathname = url.pathname;
// ================================================================
// Phase 3: SSE stream for workflow run progress
// GET /workflows/runs/:id/stream
// ================================================================
if (req.method === "GET" && pathname.match(/^\/workflows\/runs\/[a-f0-9-]+\/stream$/)) {
const runId = pathname.split("/")[3];
// Auth check
const authHeader = req.headers.authorization;
const token = authHeader?.startsWith("Bearer ") ? authHeader.substring(7) : "";
const isInternal = safeCompare(token, FLY_INTERNAL_SECRET) || safeCompare(token, SUPABASE_SERVICE_ROLE_KEY) || safeCompare(token, SERVICE_ROLE_JWT);
if (!isInternal && !token) {
jsonResponse(res, 401, { error: "Missing authorization" }, corsHeaders);
return;
}
if (!isInternal) {
const sb = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
const { data: { user: authUser }, error: authError } = await sb.auth.getUser(token);
if (authError || !authUser) {
jsonResponse(res, 401, { error: "Invalid or expired token" }, corsHeaders);
return;
}
}
// H6 FIX: Enforce per-run and total client limits
const existingClients = sseClients.get(runId)?.size || 0;
if (existingClients >= MAX_SSE_CLIENTS_PER_RUN) {
jsonResponse(res, 429, { error: "Too many SSE clients for this run" }, corsHeaders);
return;
}
if (getTotalSseClients() >= MAX_SSE_TOTAL_CLIENTS) {
jsonResponse(res, 429, { error: "Too many total SSE connections" }, corsHeaders);
return;
}
// Start SSE stream
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
...corsHeaders,
});
// Send snapshot
const sb = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
const { data: run } = await sb.from("workflow_runs")
.select("id, workflow_id, status, trigger_type, current_step_key, error_message, error_step_key, started_at, completed_at, duration_ms")
.eq("id", runId).single();
const { data: stepRuns } = await sb.from("workflow_step_runs")
.select("id, step_key, step_type, status, error_message, duration_ms, started_at, completed_at")
.eq("run_id", runId).order("created_at", { ascending: true });
sendWorkflowSSE(res, { type: "snapshot", run, steps: stepRuns || [] });
// Register client
if (!sseClients.has(runId)) sseClients.set(runId, new Set());
sseClients.get(runId)!.add(res);
// Heartbeat
const heartbeat = setInterval(() => {
try { res.write(": heartbeat\n\n"); } catch { clearInterval(heartbeat); }
}, 15_000);
// Cleanup on disconnect
const cleanup = () => {
clearInterval(heartbeat);
const clients = sseClients.get(runId);
if (clients) {
clients.delete(res);
if (clients.size === 0) sseClients.delete(runId);
}
};
req.on("close", cleanup);
req.on("error", cleanup);
return;
}
// ================================================================
// Guest approval — signed URL, no auth required (GET)
// GET /approvals/guest/:id?action=approve&expires=...&sig=...
// ================================================================
const guestApprovalMatch = pathname.match(/^\/approvals\/guest\/([a-f0-9-]+)$/);
if (guestApprovalMatch && req.method === "GET") {
const clientIp = req.headers["x-forwarded-for"]?.toString().split(",")[0]?.trim() || req.socket.remoteAddress || "unknown";
if (!checkRateLimit(clientIp)) { jsonResponse(res, 429, { error: "Too many requests" }, corsHeaders); return; }
const stepRunId = guestApprovalMatch[1];
const urlParams = new URL(req.url || "", `http://${req.headers.host}`).searchParams;
const action = urlParams.get("action") || "";
const expires = urlParams.get("expires") || "";
const sig = urlParams.get("sig") || "";
if (!action || !expires || !sig) {
jsonResponse(res, 400, { error: "Missing action, expires, or sig parameter" }, corsHeaders);
return;
}
if (new Date(expires) < new Date()) {
jsonResponse(res, 410, { error: "This approval link has expired" }, corsHeaders);
return;
}
if (!verifyGuestApprovalSignature(stepRunId, action, expires, sig)) {
jsonResponse(res, 403, { error: "Invalid signature" }, corsHeaders);
return;
}
const guestSupabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
const { data: approval } = await guestSupabase.from("workflow_approval_requests")
.select("id, store_id, run_id, status").eq("step_run_id", stepRunId).limit(1);
if (!approval?.length) {
jsonResponse(res, 404, { error: "Approval not found" }, corsHeaders);
return;
}
if (approval[0].status !== "pending") {
jsonResponse(res, 409, { error: `Approval already ${approval[0].status}` }, corsHeaders);
return;
}
const isApprove = action === "approve" || action === "approved";
const { data: guestResult, error: guestErr } = await guestSupabase.rpc("respond_to_approval", {
p_approval_id: approval[0].id,
p_store_id: approval[0].store_id,
p_status: isApprove ? "approved" : "rejected",
p_response_data: { guest: true, action },
p_responded_by: null,
});
if (guestErr) {
jsonResponse(res, 500, { success: false, error: guestErr.message }, corsHeaders);
return;
}
if (guestResult?.success && approval[0].run_id) {
try { await executeInlineChain(guestSupabase, approval[0].run_id); } catch (err) { console.error("[workflow] Inline chain failed after approval:", (err as Error).message); }
}
res.writeHead(200, { "Content-Type": "text/html", ...corsHeaders });
res.end(`<!DOCTYPE html><html><body style="font-family:system-ui;text-align:center;padding:40px">
<h2>${isApprove ? "Approved" : "Rejected"}</h2>
<p>Your response has been recorded. You can close this window.</p>
</body></html>`);
return;
}
if (req.method !== "POST") {
jsonResponse(res, 405, { error: "Method not allowed" }, corsHeaders);
return;
}
try {
// ================================================================
// Phase 2: Approval response endpoint
// POST /approvals/:id/respond
// ================================================================
const approvalMatch = pathname.match(/^\/approvals\/([a-f0-9-]+)\/respond$/);
if (approvalMatch) {
const approvalId = approvalMatch[1];
const authHeader = req.headers.authorization;
const token = authHeader?.startsWith("Bearer ") ? authHeader.substring(7) : "";
const isInternal = safeCompare(token, FLY_INTERNAL_SECRET) || safeCompare(token, SUPABASE_SERVICE_ROLE_KEY) || safeCompare(token, SERVICE_ROLE_JWT);
if (!isInternal && !token) {
jsonResponse(res, 401, { error: "Missing authorization" }, corsHeaders);
return;
}
const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
let userId: string | null = null;
if (!isInternal) {
const { data: { user: authUser }, error: authError } = await supabase.auth.getUser(token);
if (authError || !authUser) {
jsonResponse(res, 401, { error: "Invalid or expired token" }, corsHeaders);
return;
}
userId = authUser.id;
}
let rawBody: string;
try { rawBody = await readBody(req); } catch {
jsonResponse(res, 413, { error: "Request body too large" }, corsHeaders);
return;
}
const body = JSON.parse(rawBody);
if (!body.status) {
jsonResponse(res, 400, { error: "status required (approved/rejected)" }, corsHeaders);
return;
}
// Get store_id from approval
const { data: approval } = await supabase.from("workflow_approval_requests")
.select("store_id, run_id").eq("id", approvalId).single();
if (!approval) {
jsonResponse(res, 404, { error: "Approval not found" }, corsHeaders);
return;
}
const { data: result, error } = await supabase.rpc("respond_to_approval", {
p_approval_id: approvalId,
p_store_id: approval.store_id,
p_status: body.status,
p_response_data: body.response_data || {},
p_responded_by: userId || body.responded_by || null,
});
if (error) {
jsonResponse(res, 500, { success: false, error: error.message }, corsHeaders);
return;
}
// Inline resume — execute next step immediately
if (result?.success && approval.run_id) {
try {
await executeInlineChain(supabase, approval.run_id);
} catch (err) {
console.error("[approval-inline] Error resuming after approval:", (err as Error).message);
}
}
jsonResponse(res, result?.success ? 200 : 422, result, corsHeaders);
return;
}
// ================================================================
// Waitpoint completion — API endpoint
// POST /waitpoints/:token/complete
// ================================================================
const waitpointMatch = pathname.match(/^\/waitpoints\/([a-f0-9-]+)\/complete$/);
if (waitpointMatch) {
const token = waitpointMatch[1];
const authHeader = req.headers.authorization;
const authToken = authHeader?.startsWith("Bearer ") ? authHeader.substring(7) : "";
const isInternal = safeCompare(authToken, FLY_INTERNAL_SECRET) || safeCompare(authToken, SUPABASE_SERVICE_ROLE_KEY) || safeCompare(authToken, SERVICE_ROLE_JWT);
if (!isInternal && !authToken) {
jsonResponse(res, 401, { error: "Missing authorization" }, corsHeaders);
return;
}
const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
let rawBody: string;
try { rawBody = await readBody(req); } catch {
jsonResponse(res, 413, { error: "Request body too large" }, corsHeaders);
return;
}
const body = JSON.parse(rawBody || "{}");
// Find waitpoint
const { data: wp } = await supabase.from("waitpoint_tokens")
.select("id, run_id, step_run_id, store_id, expires_at, status")
.eq("token", token).single();
if (!wp) {
jsonResponse(res, 404, { error: "Waitpoint token not found" }, corsHeaders);
return;
}
if (wp.status === "completed") {
jsonResponse(res, 409, { error: "Waitpoint already completed" }, corsHeaders);
return;
}
if (new Date(wp.expires_at) < new Date()) {
jsonResponse(res, 410, { error: "Waitpoint expired" }, corsHeaders);
return;
}
// Complete it
await supabase.from("waitpoint_tokens").update({
status: "completed", completion_data: body.data || {}, completed_at: new Date().toISOString(),
}).eq("id", wp.id);
await supabase.from("workflow_step_runs").update({
status: "pending", input: { waitpoint_completed: true, waitpoint_data: body.data || {} },
}).eq("id", wp.step_run_id).eq("status", "waiting");
// Inline resume
try { await executeInlineChain(supabase, wp.run_id); } catch (err) { console.error("[workflow] Inline chain failed after waitpoint:", (err as Error).message); }
jsonResponse(res, 200, { success: true, run_id: wp.run_id }, corsHeaders);
return;
}
// ================================================================
// Webhook ingestion — no auth required (uses HMAC verification)
// POST /webhooks/:slug
// ================================================================
const webhookMatch = pathname.match(/^\/webhooks\/([a-zA-Z0-9_-]+)$/);
if (webhookMatch) {
const whClientIp = req.headers["x-forwarded-for"]?.toString().split(",")[0]?.trim() || req.socket.remoteAddress || "unknown";
if (!checkRateLimit(whClientIp)) { jsonResponse(res, 429, { error: "Too many requests" }, corsHeaders); return; }
const slug = webhookMatch[1];
let rawBody: string;
try {
rawBody = await readBody(req);
} catch {
jsonResponse(res, 413, { error: "Request body too large" }, corsHeaders);
return;
}
const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
const headers: Record<string, string> = {};
for (const [k, v] of Object.entries(req.headers)) {
if (typeof v === "string") headers[k] = v;
}
const result = await handleWebhookIngestion(supabase, slug, rawBody, headers);
// Phase 1: Inline execution for webhook-triggered workflows
if (result.body.run_id && result.status === 200) {
try {
await executeInlineChain(supabase, result.body.run_id as string);
} catch (err) {
console.error("[webhook-inline] Error in inline chain:", (err as Error).message);
}
}
jsonResponse(res, result.status, result.body, corsHeaders);
return;
}
// ================================================================
// Fire event — service-role or internal auth
// POST /events
// ================================================================
if (pathname === "/events") {
const authHeader = req.headers.authorization;
const token = authHeader?.startsWith("Bearer ") ? authHeader.substring(7) : "";
const isInternal = safeCompare(token, FLY_INTERNAL_SECRET) || safeCompare(token, SUPABASE_SERVICE_ROLE_KEY) || safeCompare(token, SERVICE_ROLE_JWT);
if (!isInternal && !token) {
jsonResponse(res, 401, { error: "Missing authorization" }, corsHeaders);
return;
}
const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
// Verify user auth if not internal
if (!isInternal) {
const { data: { user: authUser }, error: authError } = await supabase.auth.getUser(token);
if (authError || !authUser) {
jsonResponse(res, 401, { error: "Invalid or expired token" }, corsHeaders);
return;
}
}
let rawBody: string;
try {
rawBody = await readBody(req);
} catch {
jsonResponse(res, 413, { error: "Request body too large" }, corsHeaders);
return;
}
const body = JSON.parse(rawBody || "{}");
if (!body.store_id || !body.event_type) {
jsonResponse(res, 400, { error: "store_id and event_type required" }, corsHeaders);
return;
}
const { data: eventId, error: fireErr } = await supabase.rpc("fire_event", {
p_store_id: body.store_id,
p_event_type: body.event_type,
p_event_payload: body.payload || {},
p_source: body.source || "api",
});
if (fireErr) {
jsonResponse(res, 500, { success: false, error: fireErr.message }, corsHeaders);
} else {
jsonResponse(res, 200, { success: true, event_id: eventId }, corsHeaders);
}
return;
}
// ================================================================
// Internal workflow processing — verified by internal secret
// POST /workflows/process
// ================================================================
if (pathname === "/workflows/process") {
const authHeader = req.headers.authorization;
const token = authHeader?.startsWith("Bearer ") ? authHeader.substring(7) : "";
if (!FLY_INTERNAL_SECRET || (!safeCompare(token, FLY_INTERNAL_SECRET) && !safeCompare(token, SUPABASE_SERVICE_ROLE_KEY) && !safeCompare(token, SERVICE_ROLE_JWT))) {
jsonResponse(res, 401, { error: "Unauthorized" }, corsHeaders);
return;
}
let rawBody: string;
try {
rawBody = await readBody(req);
} catch {
rawBody = "{}";
}
const body = JSON.parse(rawBody || "{}");
const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
const result = await processWorkflowSteps(supabase, body.batch_size || 10);
jsonResponse(res, 200, { success: true, ...result }, corsHeaders);
return;
}
// ================================================================
// Start workflow run — service-role or user auth
// POST /workflows/start
// ================================================================
if (pathname === "/workflows/start") {
const authHeader = req.headers.authorization;
const token = authHeader?.startsWith("Bearer ") ? authHeader.substring(7) : "";
const isInternal = safeCompare(token, FLY_INTERNAL_SECRET) || safeCompare(token, SUPABASE_SERVICE_ROLE_KEY) || safeCompare(token, SERVICE_ROLE_JWT);
if (!isInternal && !token) {
jsonResponse(res, 401, { error: "Missing authorization" }, corsHeaders);
return;
}
const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
// Verify user auth if not internal
if (!isInternal) {
const { data: { user: authUser }, error: authError } = await supabase.auth.getUser(token);
if (authError || !authUser) {
jsonResponse(res, 401, { error: "Invalid or expired token" }, corsHeaders);
return;
}
}
let rawBody: string;
try {
rawBody = await readBody(req);
} catch {
jsonResponse(res, 413, { error: "Request body too large" }, corsHeaders);
return;
}
const body = JSON.parse(rawBody);
const { data, error } = await supabase.rpc("start_workflow_run", {
p_workflow_id: body.workflow_id,
p_store_id: body.store_id,
p_trigger_type: body.trigger_type || "api",
p_trigger_payload: body.trigger_payload || {},
p_idempotency_key: body.idempotency_key || null,
});
if (error) {
jsonResponse(res, 500, { success: false, error: error.message }, corsHeaders);
} else {
// Phase 4: Set version_id if workflow has a published version
// Phase 1: Inline execution for API-triggered workflows
if (data?.success && data.run_id && !data.deduplicated) {
try {
const { data: wf } = await supabase.from("workflows")
.select("published_version_id").eq("id", body.workflow_id).single();
if (wf?.published_version_id) {
await supabase.from("workflow_runs").update({ version_id: wf.published_version_id }).eq("id", data.run_id);
}
await executeInlineChain(supabase, data.run_id);
} catch (err) {
console.error("[start-inline] Error in inline chain:", (err as Error).message);
}
}
jsonResponse(res, data?.success ? 200 : 422, data, corsHeaders);
}
return;
}
// ================================================================
// Standard auth gate for all other POST routes
// ================================================================
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith("Bearer ")) {
jsonResponse(res, 401, { error: "Missing authorization" }, corsHeaders);
return;
}
const token = authHeader.substring(7);
const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
// Check service-role key
let user: { id: string; email?: string } | null = null;
const isServiceRole = safeCompare(token, SUPABASE_SERVICE_ROLE_KEY) || safeCompare(token, SERVICE_ROLE_JWT);
if (!isServiceRole) {
const {
data: { user: authUser },
error: authError,
} = await supabase.auth.getUser(token);
if (authError || !authUser) {
jsonResponse(res, 401, { error: "Invalid or expired token" }, corsHeaders);
return;
}
user = authUser;
}
// Rate limiting (skip for service-role) — 100 req/60s
if (user) {
const { data: rl } = await supabase.rpc("check_rate_limit", {
p_user_id: user.id,
p_window_seconds: 60,
p_max_requests: 100,
});
if (rl?.[0] && !rl[0].allowed) {
res.writeHead(429, {
"Retry-After": String(rl[0].retry_after_seconds),
"Content-Type": "application/json",
...corsHeaders,
});
res.end(JSON.stringify({ error: "Rate limit exceeded" }));
return;
}
}
// Read body
let rawBody: string;
try {
rawBody = await readBody(req);
} catch {
jsonResponse(res, 413, { error: "Request body too large (max 1MB)" }, corsHeaders);
return;
}
const body = JSON.parse(rawBody);
// Anthropic API proxy mode
if (body.mode === "proxy") {
await handleProxy(res, body, corsHeaders);
return;
}
// Direct tool execution mode
if (body.mode === "tool") {
const { tool_name, args, store_id } = body;
if (!tool_name) {
jsonResponse(res, 400, { error: "tool_name required" }, corsHeaders);
return;
}
// Load user tools if this is a user_tool__ prefixed call
let utRows: UserToolRow[] | undefined;
if (tool_name.startsWith("user_tool__") && store_id) {
const { rows } = await loadUserTools(supabase, store_id);
utRows = rows;
}
const result = await executeTool(
supabase,
tool_name,
(args || {}) as Record<string, unknown>,
store_id || undefined,
undefined,
user?.id || body.userId || null,
user?.email || body.userEmail || null,
"mcp",
undefined,
utRows,
);
jsonResponse(res, result.success ? 200 : 500, result, corsHeaders);
return;
}
// Agent chat mode (SSE)
await handleAgentChat(req, res, supabase, body, user, isServiceRole, token, corsHeaders);
} catch (err) {
if (!res.headersSent) {
jsonResponse(res, 500, { error: sanitizeError(err) }, corsHeaders);
}
}
});
// Inject tool executor into workflow engine (avoids circular dependency)
setToolExecutor((supabase, toolName, args, storeId, traceId) => {
// Store boundary validation: prevent workflows from accessing other stores
if (args.store_id && args.store_id !== storeId) {
return Promise.resolve({ success: false, error: "Store boundary violation: workflow cannot access other stores" });
}
args.store_id = storeId; // Force the workflow's store
return executeTool(supabase, toolName, args, storeId, traceId, null, null, "workflow_engine");
});
// Inject agent executor for "agent" step type in workflows
setAgentExecutor(async (supabase, agentId, prompt, storeId, maxTurns = 5, onToken, traceId) => {
const agent = await loadAgentConfig(supabase, agentId);
if (!agent) return { success: false, error: `Agent ${agentId} not found` };
const allTools = await loadTools(supabase);
const { rows: userToolRows, defs: userToolDefs } = await loadUserTools(supabase, storeId);
const tools = getToolsForAgent(agent, allTools, userToolDefs);
const agentModel = agent.model || "claude-sonnet-4-20250514";
let systemPrompt = agent.system_prompt || "You are a helpful assistant.";
systemPrompt += `\n\nYou are operating for store_id: ${storeId}. Always include this in tool calls that require it.`;
if (!agent.can_modify) systemPrompt += "\n\nIMPORTANT: You have read-only access.";
if (agent.tone && agent.tone !== "professional") systemPrompt += `\n\nTone: ${agent.tone}`;
if (agent.verbosity === "concise") systemPrompt += "\n\nBe concise.";
const maxToolResultChars = getMaxToolResultChars(agent.context_config);
try {
const result = await runServerAgentLoop({
anthropic: getAnthropicClient(agent),
supabase,
model: agentModel,
systemPrompt,
messages: [{ role: "user", content: prompt }],
tools,
maxTurns,
temperature: agent.temperature ?? 0.7,
maxToolResultChars,
storeId,
source: "workflow_agent",
agentId,
traceId,
executeTool: async (toolName, args) => {
const toolArgs = { ...args };
if (!toolArgs.store_id) toolArgs.store_id = storeId;
return executeTool(supabase, toolName, toolArgs, storeId,
traceId, null, null, "workflow_agent", undefined, userToolRows, agentId);
},
enableStreaming: !!onToken,
onText: onToken || undefined,
maxDurationMs: 2 * 60 * 1000,
});
return { success: true, response: result.finalText || "(no response)" };
} catch (err) {
return { success: false, error: sanitizeError(err) };
}
});
// Inject token broadcaster for real-time agent step streaming
setTokenBroadcaster((runId, stepKey, token) => {
broadcastToRun(runId, { type: "agent_token", run_id: runId, step_key: stepKey, token });
});
// ============================================================================
// PERSISTENT WORKFLOW WORKER LOOP (5-second interval)
// ============================================================================
const WORKER_INTERVAL_MS = 5_000;
let workerRunning = false;
async function workflowWorkerLoop(): Promise<void> {
if (workerRunning) return; // Prevent concurrent runs
workerRunning = true;
try {
const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
const [stepResult, waitingResolved] = await Promise.all([
processWorkflowSteps(supabase, 10),
processWaitingSteps(supabase),
Promise.resolve(supabase.rpc("expire_pending_waitpoints")).then(() => {}).catch(e => console.warn('[worker] expire_pending_waitpoints failed:', e.message)), // Non-fatal
]);
// Schedule triggers + timeout enforcement + event triggers (every tick, lightweight)
const [scheduled, timedOut, eventsProcessed] = await Promise.all([
processScheduleTriggers(supabase).catch(e => { console.warn('[worker] processScheduleTriggers failed:', e.message); return 0; }),
enforceWorkflowTimeouts(supabase).catch(e => { console.warn('[worker] enforceWorkflowTimeouts failed:', e.message); return 0; }),
processEventTriggers(supabase).catch(e => { console.warn('[worker] processEventTriggers failed:', e.message); return 0; }),
]);
if (stepResult.processed > 0 || waitingResolved > 0 || scheduled > 0 || timedOut > 0 || eventsProcessed > 0) {
console.log(`[worker] processed=${stepResult.processed} errors=${stepResult.errors} waiting=${waitingResolved} scheduled=${scheduled} timed_out=${timedOut} events=${eventsProcessed}`);
}
} catch (err) {
console.error("[worker] error:", sanitizeError(err));
} finally {
workerRunning = false;
}
}
const workerInterval = setInterval(workflowWorkerLoop, WORKER_INTERVAL_MS);
server.listen(PORT, () => {
console.log(`[whale-server] Listening on port ${PORT}`);
console.log(`[whale-server] Supabase: ${SUPABASE_URL}`);
console.log(`[whale-server] Runtime: Node.js ${process.version}`);
console.log(`[whale-server] Workflow worker: ${WORKER_INTERVAL_MS}ms interval`);
// Initialize code worker pool for fast code step execution
try {
initWorkerPool();
const stats = getPoolStats();
console.log(`[whale-server] Code worker pool: ${stats.total} workers`);
workerPoolReady = true;
} catch (err: any) {
console.error("[worker-pool] Init failed:", err.message);
}
// Phase 3: Start pg LISTEN for real-time SSE streaming
setupPgListen().catch((err) => {
console.error("[pg-listen] Setup failed:", err.message);
});
});
// ============================================================================
// GRACEFUL SHUTDOWN
// ============================================================================
function gracefulShutdown(signal: string) {
console.log(`[server] Received ${signal}, shutting down gracefully...`);
// 1. Stop accepting new connections
server.close(() => {
console.log("[server] HTTP server closed");
});
// 2. Clear workflow worker intervals
clearInterval(workerInterval);
clearInterval(sseCleanupInterval);
// 3. Close all SSE client connections
for (const [, clients] of sseClients) {
for (const res of clients) {
try { res.end(); } catch { /* client already disconnected — benign */ }
}
clients.clear();
}
sseClients.clear();
// 4. Shut down code worker pool
try {
shutdownPool();
console.log("[server] Worker pool shut down");
} catch (err) { console.error("[server] Worker pool shutdown error:", (err as Error).message); }
// 5. Close pg LISTEN connection
if (pgClient) {
pgClient.end().catch(() => {});
pgClient = null;
console.log("[server] pg LISTEN connection closed");
}
// 6. Force exit after 10 seconds if graceful shutdown hangs
setTimeout(() => {
console.error("[server] Graceful shutdown timed out, forcing exit");
process.exit(1);
}, 10_000).unref();
}
process.on("SIGTERM", () => gracefulShutdown("SIGTERM"));
process.on("SIGINT", () => gracefulShutdown("SIGINT"));