// server/handlers/workflow-steps.ts — Step executor engine
// Extracted from workflows.ts to separate step execution from workflow CRUD/management.
//
// Contains: step type executors, executeAndAdvance, inline chain execution,
// circuit breakers, code execution (JS/Python), cron parser, schedule/timeout processing,
// event trigger processing, flow control, webhook ingestion, and all step advancement helpers.
import { createHmac, timingSafeEqual, randomUUID } from "node:crypto";
import { runInNewContext } from "node:vm";
import { execFile } from "node:child_process";
import { writeFile, unlink } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { SupabaseClient } from "@supabase/supabase-js";
import { resolveTemplate, evaluateCondition, type TemplateContext } from "../lib/template-resolver.js";
import { sanitizeError } from "../../shared/agent-core.js";
import { executeWithPool, initWorkerPool, getPoolStats, shutdownPool } from "../lib/code-worker-pool.js";
// ============================================================================
// CONSTANTS
// ============================================================================
const MAX_INLINE_DEPTH = 50;
const CODE_TIMEOUT_MS = 5000;
const CODE_OUTPUT_MAX = 102_400; // 100KB
const GUEST_APPROVAL_SECRET = process.env.GUEST_APPROVAL_SECRET || process.env.FLY_INTERNAL_SECRET || "";
const GUEST_APPROVAL_BASE_URL = "https://whale-agent.fly.dev/approvals/guest";
// ============================================================================
// GUEST APPROVAL — HMAC-signed URLs for unauthenticated approvers
// ============================================================================
export function generateGuestApprovalUrl(approvalId: string, action: string, expiresAt: string): string | null {
if (!GUEST_APPROVAL_SECRET) return null; // Guest approvals disabled — no signing secret configured
const payload = `${approvalId}:${action}:${expiresAt}`;
const sig = createHmac("sha256", GUEST_APPROVAL_SECRET).update(payload).digest("hex");
return `${GUEST_APPROVAL_BASE_URL}/${approvalId}?action=${action}&expires=${encodeURIComponent(expiresAt)}&sig=${sig}`;
}
export function verifyGuestApprovalSignature(approvalId: string, action: string, expiresAt: string, sig: string): boolean {
if (!GUEST_APPROVAL_SECRET) return false; // Guest approvals disabled — no signing secret configured
const payload = `${approvalId}:${action}:${expiresAt}`;
const expected = createHmac("sha256", GUEST_APPROVAL_SECRET).update(payload).digest("hex");
try {
return timingSafeEqual(Buffer.from(sig, "hex"), Buffer.from(expected, "hex"));
} catch {
return false;
}
}
// ============================================================================
// EVENT JOURNAL — append-only state transition log
// ============================================================================
export async function logWorkflowEvent(
supabase: SupabaseClient, runId: string, eventType: string,
payload: Record<string, unknown>, stepRunId?: string,
): Promise<void> {
await supabase.from("workflow_events").insert({
run_id: runId,
step_run_id: stepRunId || null,
event_type: eventType,
payload,
}).then(() => {}); // fire-and-forget
}
// ============================================================================
// TYPES
// ============================================================================
export interface StepClaim {
step_run_id: string;
run_id: string;
workflow_id: string;
store_id: string;
step_id: string;
step_key: string;
step_type: string;
step_config: Record<string, unknown>;
on_success: string | null;
on_failure: string | null;
timeout_seconds: number;
input_schema: Record<string, unknown> | null;
step_outputs: Record<string, unknown>;
trigger_payload: Record<string, unknown>;
attempt_count: number;
max_attempts: number;
max_steps_per_run: number;
input: unknown | null;
parent_step_run_id: string | null;
retry_delay_seconds: number;
}
export interface StepResult {
success: boolean;
output?: unknown;
error?: string;
branch?: string;
}
// ============================================================================
// FLOW CONTROL — concurrency + rate limiting at step level
// ============================================================================
async function checkFlowControl(
supabase: SupabaseClient, step: StepClaim,
): Promise<{ allowed: boolean; reason?: string }> {
const config = step.step_config;
// Per-step concurrency limit
const concurrencyLimit = config.concurrency_limit as number;
if (concurrencyLimit && concurrencyLimit > 0) {
const concurrencyKey = (config.concurrency_key as string) || step.step_key;
const { count } = await supabase.from("workflow_step_runs")
.select("id", { count: "exact", head: true })
.eq("step_key", concurrencyKey)
.eq("status", "running")
.neq("id", step.step_run_id); // exclude self
if ((count || 0) >= concurrencyLimit) {
return { allowed: false, reason: `Concurrency limit ${concurrencyLimit} reached for '${concurrencyKey}'` };
}
}
// Per-step rate limit (max N executions per window)
const rateLimit = config.rate_limit as number;
const rateWindowSec = (config.rate_window_seconds as number) || 60;
if (rateLimit && rateLimit > 0) {
const windowStart = new Date(Date.now() - rateWindowSec * 1000).toISOString();
const { count } = await supabase.from("workflow_step_runs")
.select("id", { count: "exact", head: true })
.eq("step_key", step.step_key)
.in("status", ["success", "running"])
.gte("started_at", windowStart);
if ((count || 0) >= rateLimit) {
return { allowed: false, reason: `Rate limit ${rateLimit}/${rateWindowSec}s reached for '${step.step_key}'` };
}
}
return { allowed: true };
}
// ============================================================================
// INJECTED EXECUTORS (set from index.ts to avoid circular deps)
// ============================================================================
type ExecuteToolFn = (
supabase: SupabaseClient,
toolName: string,
args: Record<string, unknown>,
storeId?: string,
traceId?: string,
) => Promise<{ success: boolean; data?: unknown; error?: string }>;
type TokenBroadcastFn = (runId: string, stepKey: string, token: string) => void;
type RunAgentQueryFn = (
supabase: SupabaseClient,
agentId: string,
prompt: string,
storeId: string,
maxTurns?: number,
onToken?: (token: string) => void,
traceId?: string,
) => Promise<{ success: boolean; response?: string; error?: string }>;
let _executeTool: ExecuteToolFn | null = null;
let _runAgentQuery: RunAgentQueryFn | null = null;
let _broadcastToken: TokenBroadcastFn | null = null;
export function setToolExecutor(fn: ExecuteToolFn): void { _executeTool = fn; }
export function setAgentExecutor(fn: RunAgentQueryFn): void { _runAgentQuery = fn; }
export function setTokenBroadcaster(fn: TokenBroadcastFn): void { _broadcastToken = fn; }
// ============================================================================
// STEP EXECUTORS
// ============================================================================
async function executeToolStep(
supabase: SupabaseClient, config: Record<string, unknown>,
ctx: TemplateContext, storeId: string, traceId?: string,
): Promise<StepResult> {
if (!_executeTool) return { success: false, error: "Tool executor not initialized" };
const toolName = config.tool_name as string;
if (!toolName) return { success: false, error: "No tool_name in step config" };
const argsTemplate = (config.args_template || config.args || {}) as Record<string, unknown>;
const resolvedArgs = resolveTemplate(argsTemplate, ctx) as Record<string, unknown>;
if (config.tool_id) {
const cb = await checkToolCircuitBreaker(supabase, config.tool_id as string);
if (!cb.allowed) return { success: false, error: cb.reason };
}
const result = await _executeTool(supabase, toolName, resolvedArgs, storeId, traceId);
if (config.tool_id) await updateToolCircuitBreaker(supabase, config.tool_id as string, result.success);
return result.success
? { success: true, output: result.data }
: { success: false, error: result.error };
}
function executeConditionStep(config: Record<string, unknown>, ctx: TemplateContext): StepResult {
const expression = config.expression as string;
if (!expression) return { success: false, error: "No expression in condition step" };
if (!config.on_true && !config.on_false) {
return { success: false, output: { error: "Condition step must have at least on_true or on_false defined" } };
}
const result = evaluateCondition(expression, ctx);
const branch = result ? (config.on_true as string || undefined) : (config.on_false as string || undefined);
return { success: true, output: { condition_result: result, branch }, branch };
}
function executeTransformStep(config: Record<string, unknown>, ctx: TemplateContext): StepResult {
const mapping = config.mapping as Record<string, unknown>;
if (!mapping) return { success: false, error: "No mapping in transform step" };
return { success: true, output: resolveTemplate(mapping, ctx) };
}
async function executeAgentStep(
config: Record<string, unknown>, ctx: TemplateContext, storeId: string,
supabase: SupabaseClient, step?: StepClaim, traceId?: string,
): Promise<StepResult> {
if (!_runAgentQuery) return { success: false, error: "Agent executor not initialized" };
const agentId = config.agent_id as string;
if (!agentId) return { success: false, error: "No agent_id in agent step config" };
const promptTemplate = (config.prompt_template || config.prompt || "") as string;
const prompt = resolveTemplate(promptTemplate, ctx) as string;
if (!prompt) return { success: false, error: "No prompt resolved for agent step" };
// AI tool gating — inject allowed/blocked tool lists into prompt
const allowedTools = config.allowed_tools as string[] | undefined;
const blockedTools = config.blocked_tools as string[] | undefined;
const requireApprovalTools = config.require_approval_tools as string[] | undefined;
let gatedPrompt = prompt;
if (allowedTools?.length) {
gatedPrompt += `\n\n[SYSTEM: You may ONLY use these tools: ${allowedTools.join(", ")}. Refuse any other tool calls.]`;
}
if (blockedTools?.length) {
gatedPrompt += `\n\n[SYSTEM: You must NEVER use these tools: ${blockedTools.join(", ")}. Use alternatives instead.]`;
}
if (requireApprovalTools?.length && step) {
// Check if approval was already given (stored in step input from approval step)
const approvedTools = (step.input as any)?.approved_tools as string[] | undefined;
const pendingTools = requireApprovalTools.filter(t => !approvedTools?.includes(t));
if (pendingTools.length > 0) {
gatedPrompt += `\n\n[SYSTEM: The following tools require human approval before use: ${pendingTools.join(", ")}. Do NOT call them — describe what you would do and why, then stop.]`;
}
}
const maxTurns = (config.max_turns as number) || 5;
// Wire up token broadcasting for SSE streaming to connected clients
const onToken = step && _broadcastToken
? (token: string) => _broadcastToken!(step.run_id, step.step_key, token)
: undefined;
const result = await _runAgentQuery(supabase, agentId, gatedPrompt, storeId, maxTurns, onToken, traceId);
return result.success
? { success: true, output: { response: result.response } }
: { success: false, error: result.error };
}
// H4 FIX: Block internal/private IPs to prevent SSRF
function isBlockedUrl(urlStr: string): boolean {
try {
const u = new URL(urlStr);
const host = u.hostname.toLowerCase();
// Block localhost variants
if (host === "localhost" || host === "127.0.0.1" || host === "::1" || host === "[::1]" || host === "0.0.0.0") return true;
// Block private IPv4 ranges (RFC 1918)
if (/^10\./.test(host)) return true;
if (/^172\.(1[6-9]|2\d|3[01])\./.test(host)) return true;
if (/^192\.168\./.test(host)) return true;
// Block IPv6 private ranges
if (/^fe80:/i.test(host) || /^\[fe80:/i.test(host)) return true; // Link-local
if (/^fc00:/i.test(host) || /^\[fc00:/i.test(host)) return true; // Unique local
if (/^fd/i.test(host) || /^\[fd/i.test(host)) return true; // Unique local
// Block cloud metadata
if (host === "169.254.169.254") return true;
if (/^169\.254\./.test(host)) return true;
// Block internal TLDs
if (host.endsWith(".internal") || host.endsWith(".local") || host.endsWith(".fly.dev")) return true;
// Block non-HTTP(S) schemes
if (u.protocol !== "http:" && u.protocol !== "https:") return true;
return false;
} catch {
return true; // Block malformed URLs
}
}
async function executeWebhookOutStep(
config: Record<string, unknown>, ctx: TemplateContext,
): Promise<StepResult> {
const url = resolveTemplate(config.url as string, ctx) as string;
if (!url) return { success: false, error: "No URL in webhook_out step" };
if (isBlockedUrl(url)) return { success: false, error: "URL targets a blocked internal/private address" };
const method = ((config.method as string) || "POST").toUpperCase();
const headers: Record<string, string> = {};
if (config.headers && typeof config.headers === "object") {
for (const [k, v] of Object.entries(config.headers as Record<string, string>)) {
headers[k] = resolveTemplate(v, ctx) as string;
}
}
let body: string | undefined;
if (method !== "GET" && method !== "HEAD") {
const bodyTemplate = config.body_template || {};
body = JSON.stringify(resolveTemplate(bodyTemplate, ctx));
if (!headers["Content-Type"]) headers["Content-Type"] = "application/json";
}
if (config.hmac_secret && body) {
const hmac = createHmac("sha256", config.hmac_secret as string).update(body).digest("hex");
headers["X-Webhook-Signature"] = `sha256=${hmac}`;
}
try {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), 30_000);
const resp = await fetch(url, { method, headers, body, signal: controller.signal });
clearTimeout(timer);
const ct = resp.headers.get("content-type") || "";
const data = ct.includes("json") ? await resp.json() : await resp.text();
if (!resp.ok) return { success: false, error: `HTTP ${resp.status}: ${String(data).substring(0, 500)}` };
return { success: true, output: { status: resp.status, data } };
} catch (err: any) {
if (err.name === "AbortError") return { success: false, error: "Webhook request timed out" };
return { success: false, error: sanitizeError(err) };
}
}
function executeNoopStep(): StepResult {
return { success: true, output: { noop: true } };
}
// ============================================================================
// PHASE 2: APPROVAL STEP EXECUTOR
// ============================================================================
async function executeApprovalStep(
supabase: SupabaseClient, step: StepClaim, ctx: TemplateContext,
): Promise<StepResult | "waiting"> {
const config = step.step_config;
// Second pass — step was resumed after approval response
if (step.input && typeof step.input === "object" && (step.input as any).approval_status) {
const approvalData = step.input as { approval_status: string; approval_data?: unknown; responded_by?: string };
const isApproved = approvalData.approval_status === "approved" || approvalData.approval_status === "approve";
return {
success: true,
output: {
approved: isApproved,
status: approvalData.approval_status,
response_data: approvalData.approval_data,
responded_by: approvalData.responded_by,
},
branch: isApproved ? (config.on_approve as string) : (config.on_reject as string),
};
}
// First pass — create approval request and wait
const title = resolveTemplate((config.title || "Approval Required") as string, ctx) as string;
const description = config.description ? resolveTemplate(config.description as string, ctx) as string : null;
const prompt = config.prompt ? resolveTemplate(config.prompt as string, ctx) as string : null;
const options = config.options || ["approve", "reject"];
const timeoutSeconds = (config.timeout_seconds as number) || 86400;
const timeoutAction = (config.timeout_action as string) || "fail";
const channels = config.notification_channels || ["push"];
const expiresAt = new Date(Date.now() + timeoutSeconds * 1000).toISOString();
await supabase.from("workflow_approval_requests").insert({
store_id: step.store_id,
run_id: step.run_id,
step_run_id: step.step_run_id,
workflow_id: step.workflow_id,
title,
description,
prompt,
options,
form_schema: config.form_schema || null,
assigned_to: config.assigned_to || null,
assigned_role: config.assigned_role || null,
expires_at: expiresAt,
timeout_action: timeoutAction,
notification_channels: channels,
});
// Generate guest approval URLs (signed, no auth required) — only if signing secret is configured
const guestUrls: Record<string, string> = {};
const optionsList = Array.isArray(options) ? options as string[] : ["approve", "reject"];
for (const opt of optionsList) {
const url = generateGuestApprovalUrl(step.step_run_id, opt, expiresAt);
if (url) guestUrls[opt] = url;
}
// Set step to waiting
await supabase.from("workflow_step_runs").update({
status: "waiting",
output: { waiting_for: "approval", title, expires_at: expiresAt, guest_urls: guestUrls },
}).eq("id", step.step_run_id);
return "waiting";
}
// ============================================================================
// PHASE 7: ENHANCED CODE EXECUTION
// ============================================================================
// Re-export pool management for index.ts to initialize on startup
export { initWorkerPool, getPoolStats, shutdownPool };
// ============================================================================
// CRON EXPRESSION PARSER — 5-field (min hour dom mon dow)
// No external dependencies. Supports: *, */N, N-M, N,M, N
// ============================================================================
function parseCronField(field: string, min: number, max: number): number[] {
const values: Set<number> = new Set();
for (const part of field.split(",")) {
const trimmed = part.trim();
if (trimmed === "*") {
for (let i = min; i <= max; i++) values.add(i);
} else if (trimmed.includes("/")) {
const [range, stepStr] = trimmed.split("/");
const step = parseInt(stepStr, 10);
if (isNaN(step) || step <= 0) continue;
let start = min, end = max;
if (range !== "*") {
if (range.includes("-")) {
[start, end] = range.split("-").map(Number);
} else {
start = parseInt(range, 10);
}
}
for (let i = start; i <= end; i += step) values.add(i);
} else if (trimmed.includes("-")) {
const [s, e] = trimmed.split("-").map(Number);
for (let i = s; i <= e; i++) values.add(i);
} else {
const n = parseInt(trimmed, 10);
if (!isNaN(n) && n >= min && n <= max) values.add(n);
}
}
return [...values].sort((a, b) => a - b);
}
/**
* Compute the next occurrence of a 5-field cron expression after `after`.
* Returns null if expression is invalid or no match found within 366 days.
*/
export function getNextCronTime(expression: string, after: Date = new Date()): Date | null {
const parts = expression.trim().split(/\s+/);
if (parts.length !== 5) return null;
const minutes = parseCronField(parts[0], 0, 59);
const hours = parseCronField(parts[1], 0, 23);
const doms = parseCronField(parts[2], 1, 31);
const months = parseCronField(parts[3], 1, 12);
const dows = parseCronField(parts[4], 0, 6); // 0=Sunday
if (!minutes.length || !hours.length || !doms.length || !months.length || !dows.length) return null;
// Start 1 minute after `after`
const candidate = new Date(after);
candidate.setSeconds(0, 0);
candidate.setMinutes(candidate.getMinutes() + 1);
const maxDate = new Date(after);
maxDate.setDate(maxDate.getDate() + 366);
while (candidate < maxDate) {
if (!months.includes(candidate.getMonth() + 1)) {
candidate.setMonth(candidate.getMonth() + 1, 1);
candidate.setHours(0, 0, 0, 0);
continue;
}
if (!doms.includes(candidate.getDate()) || !dows.includes(candidate.getDay())) {
candidate.setDate(candidate.getDate() + 1);
candidate.setHours(0, 0, 0, 0);
continue;
}
if (!hours.includes(candidate.getHours())) {
candidate.setHours(candidate.getHours() + 1, 0, 0, 0);
continue;
}
if (!minutes.includes(candidate.getMinutes())) {
candidate.setMinutes(candidate.getMinutes() + 1, 0, 0);
continue;
}
return candidate;
}
return null;
}
// ============================================================================
// SCHEDULE TRIGGER PROCESSING — fires due cron workflows
// ============================================================================
export async function processScheduleTriggers(supabase: SupabaseClient): Promise<number> {
// Find workflows that are past due — supports both cron (recurring) and one-time (run_at)
const { data: dueWorkflows } = await supabase.from("workflows")
.select("id, store_id, cron_expression, timezone")
.not("next_run_at", "is", null)
.lte("next_run_at", new Date().toISOString())
.eq("is_active", true)
.eq("status", "active")
.limit(10);
if (!dueWorkflows?.length) return 0;
let fired = 0;
for (const wf of dueWorkflows) {
try {
const isOneTime = !wf.cron_expression;
// Start the run
const { data: result, error } = await supabase.rpc("start_workflow_run", {
p_workflow_id: wf.id,
p_store_id: wf.store_id,
p_trigger_type: "schedule",
p_trigger_payload: {
cron: wf.cron_expression || null,
one_time: isOneTime,
scheduled_at: new Date().toISOString(),
},
p_idempotency_key: null,
});
if (error || !result?.success) {
console.error(`[schedule] Failed to start workflow ${wf.id}:`, error?.message || result?.error);
// Still update next_run_at to prevent infinite retries
} else {
fired++;
// Generate trace_id for the new run
const traceId = randomUUID();
await supabase.from("workflow_runs").update({ trace_id: traceId }).eq("id", result.run_id);
// Inline execution
try { await executeInlineChain(supabase, result.run_id); } catch (err) { console.error("[workflow] Inline chain failed for run", result.run_id, ":", (err as Error).message); }
}
if (isOneTime) {
// One-time schedule: clear next_run_at and deactivate
await supabase.from("workflows").update({
last_scheduled_at: new Date().toISOString(),
next_run_at: null,
is_active: false,
status: "paused",
}).eq("id", wf.id);
} else {
// Recurring cron: compute next run time
const nextRun = getNextCronTime(wf.cron_expression, new Date());
await supabase.from("workflows").update({
last_scheduled_at: new Date().toISOString(),
next_run_at: nextRun?.toISOString() || null,
}).eq("id", wf.id);
}
} catch (err) {
console.error(`[schedule] Error processing workflow ${wf.id}:`, sanitizeError(err));
}
}
return fired;
}
// ============================================================================
// WORKFLOW TIMEOUT ENFORCEMENT — cancel overtime runs
// ============================================================================
export async function enforceWorkflowTimeouts(supabase: SupabaseClient): Promise<number> {
// Find running workflows that exceeded their duration limit
const { data: timedOut } = await supabase.from("workflow_runs")
.select("id, workflow_id, store_id, started_at, workflows!inner(max_run_duration_seconds, name)")
.eq("status", "running")
.not("started_at", "is", null)
.limit(50);
if (!timedOut?.length) return 0;
let count = 0;
const now = Date.now();
for (const run of timedOut) {
const wf = (run as any).workflows;
const maxDuration = wf?.max_run_duration_seconds;
if (!maxDuration || maxDuration <= 0) continue;
const elapsed = now - new Date(run.started_at).getTime();
if (elapsed < maxDuration * 1000) continue;
// This run has timed out
await completeWorkflowRun(
supabase, run.id, run.workflow_id, run.store_id,
"timed_out",
`Workflow exceeded max duration of ${maxDuration}s (ran for ${Math.round(elapsed / 1000)}s)`,
);
// Archive to DLQ
await archiveToDlq(supabase, run.id, run.workflow_id, run.store_id, wf?.name);
count++;
console.log(`[timeout] Workflow run ${run.id} timed out after ${Math.round(elapsed / 1000)}s`);
}
return count;
}
// ============================================================================
// EVENT TRIGGER PROCESSING — match inbound events to workflow subscriptions
// ============================================================================
export async function processEventTriggers(supabase: SupabaseClient): Promise<number> {
// Claim a batch of pending events (atomically set to 'processing')
const { data: events } = await supabase.from("automation_events")
.select("id, store_id, event_type, event_payload, source")
.eq("status", "pending")
.order("created_at", { ascending: true })
.limit(20);
if (!events?.length) return 0;
// Mark claimed events as processing
const eventIds = events.map(e => e.id);
await supabase.from("automation_events")
.update({ status: "processing" })
.in("id", eventIds);
let processed = 0;
for (const event of events) {
try {
// Find matching active subscriptions for this event type + store
const { data: subs } = await supabase.from("workflow_event_subscriptions")
.select("id, workflow_id, filter_expression")
.eq("store_id", event.store_id)
.eq("event_type", event.event_type)
.eq("is_active", true);
if (!subs?.length) {
// No subscribers — mark processed and move on
await supabase.from("automation_events")
.update({ status: "processed", processed_at: new Date().toISOString() })
.eq("id", event.id);
processed++;
continue;
}
for (const sub of subs) {
// Optional filter expression evaluation
if (sub.filter_expression) {
try {
const ctx: TemplateContext = {
trigger: event.event_payload || {},
steps: {},
};
const pass = evaluateCondition(sub.filter_expression, ctx);
if (!pass) continue; // Filter didn't match — skip this subscription
} catch {
// Filter eval error — skip rather than block
continue;
}
}
// Start a workflow run for each matching subscription
const idempotencyKey = `event:${event.id}:${sub.workflow_id}`;
const { data: result, error: startErr } = await supabase.rpc("start_workflow_run", {
p_workflow_id: sub.workflow_id,
p_store_id: event.store_id,
p_trigger_type: "event",
p_trigger_payload: {
...(event.event_payload || {}),
_event_id: event.id,
_event_type: event.event_type,
_event_source: event.source,
},
p_idempotency_key: idempotencyKey,
});
if (startErr || !result?.success) {
console.error(`[event-trigger] Failed to start workflow ${sub.workflow_id} for event ${event.id}:`, startErr?.message || result?.error);
continue;
}
// Assign trace ID and run inline chain for immediate execution
if (result.run_id && !result.deduplicated) {
const traceId = randomUUID();
await supabase.from("workflow_runs").update({ trace_id: traceId }).eq("id", result.run_id);
try { await executeInlineChain(supabase, result.run_id); } catch (err) { console.error("[workflow] Inline chain failed for event run", result.run_id, ":", (err as Error).message); }
}
}
// Mark event as processed
await supabase.from("automation_events")
.update({ status: "processed", processed_at: new Date().toISOString() })
.eq("id", event.id);
processed++;
} catch (err) {
// Mark event as failed
await supabase.from("automation_events")
.update({
status: "failed",
processed_at: new Date().toISOString(),
error_message: sanitizeError(err),
})
.eq("id", event.id);
console.error(`[event-trigger] Error processing event ${event.id}:`, sanitizeError(err));
}
}
return processed;
}
// ============================================================================
// DEAD LETTER QUEUE — archive failed runs for investigation
// ============================================================================
async function archiveToDlq(
supabase: SupabaseClient, runId: string, workflowId: string, storeId: string, workflowName?: string,
): Promise<void> {
const { data: run } = await supabase.from("workflow_runs")
.select("error_message, error_step_key, trigger_type, trigger_payload, step_outputs, duration_ms")
.eq("id", runId).single();
if (!run) return;
await supabase.from("workflow_dlq").insert({
store_id: storeId,
run_id: runId,
workflow_id: workflowId,
workflow_name: workflowName || null,
error_message: run.error_message,
error_step_key: run.error_step_key,
trigger_type: run.trigger_type,
trigger_payload: run.trigger_payload || {},
step_outputs: run.step_outputs || {},
run_duration_ms: run.duration_ms,
}).then(() => {}); // fire-and-forget
}
/**
* Process-isolated JS code execution via persistent worker pool.
* Falls back to one-shot fork if pool is unavailable.
* A crash, OOM, or infinite loop in user code cannot take down the server.
*/
async function executeCodeStepIsolated(config: Record<string, unknown>, ctx: TemplateContext): Promise<StepResult> {
const code = config.code as string;
if (!code) return { success: false, error: "No code in code step config" };
const language = (config.language as string) || "javascript";
if (language === "python") {
return executePythonCode(code, ctx);
}
const timeoutMs = (config.timeout_ms as number) || CODE_TIMEOUT_MS;
try {
const result = await executeWithPool({
code,
context: { steps: ctx.steps, trigger: ctx.trigger, input: ctx.input },
timeoutMs,
});
return result;
} catch {
// Ultimate fallback to in-process execution
return executeCodeStepInProcess(config, ctx);
}
}
/** In-process fallback (used if fork fails) */
function executeCodeStepInProcess(config: Record<string, unknown>, ctx: TemplateContext): StepResult {
const code = config.code as string;
if (!code) return { success: false, error: "No code in code step config" };
const dangerousPatterns = [
/constructor\s*\[/i, /constructor\s*\(/i, /__proto__/i, /prototype\s*\[/i,
/\bprocess\b/, /\brequire\b/, /\bimport\b/, /\bglobalThis\b/,
/\bglobal\b/, /\bFunction\b/, /\beval\b/,
/this\s*\[/i, // Dynamic property access: this["constructor"]
/\]\s*\(/, // Bracket-based call: obj["eval"]()
/\\u[0-9a-f]{4}/i, // Unicode escapes to hide keywords
/\\x[0-9a-f]{2}/i, // Hex escapes
/String\s*\.\s*fromCharCode/i, // String.fromCharCode to build "eval" etc.
/Reflect/i, // Reflect API
/Proxy/i, // Proxy API
/Symbol/i, // Symbol API
/with\s*\(/i, // with statement
/import\s*\(/i, // Dynamic import
];
for (const pattern of dangerousPatterns) {
if (pattern.test(code)) return { success: false, error: `Code contains blocked pattern: ${pattern.source}` };
}
const timeoutMs = (config.timeout_ms as number) || CODE_TIMEOUT_MS;
const logs: string[] = [];
const sandbox = Object.create(null) as Record<string, unknown>;
sandbox.steps = JSON.parse(JSON.stringify(ctx.steps));
sandbox.trigger = JSON.parse(JSON.stringify(ctx.trigger));
sandbox.input = ctx.input != null ? JSON.parse(JSON.stringify(ctx.input)) : undefined;
sandbox.output = undefined;
sandbox.console = Object.freeze({ log: (...args: unknown[]) => logs.push(args.map(String).join(" ")) });
sandbox.JSON = Object.freeze({ parse: JSON.parse, stringify: JSON.stringify });
sandbox.Math = Math; sandbox.Date = Date; sandbox.Array = Array; sandbox.Object = Object;
sandbox.String = String; sandbox.Number = Number; sandbox.Boolean = Boolean;
sandbox.parseInt = parseInt; sandbox.parseFloat = parseFloat; sandbox.isNaN = isNaN; sandbox.isFinite = isFinite;
sandbox.encodeURIComponent = encodeURIComponent; sandbox.decodeURIComponent = decodeURIComponent;
sandbox.URL = URL; sandbox.Buffer = Object.freeze({ from: Buffer.from, alloc: Buffer.alloc });
sandbox.crypto = Object.freeze({ randomUUID }); sandbox.fetch = globalThis.fetch;
sandbox.process = undefined; sandbox.require = undefined; sandbox.global = undefined;
sandbox.globalThis = undefined; sandbox.Function = undefined; sandbox.eval = undefined;
sandbox.setTimeout = undefined; sandbox.setInterval = undefined;
sandbox.Reflect = undefined; sandbox.Proxy = undefined; sandbox.Symbol = undefined;
sandbox.WeakRef = undefined; sandbox.FinalizationRegistry = undefined;
sandbox.SharedArrayBuffer = undefined; sandbox.Atomics = undefined;
try {
const result = runInNewContext(code, sandbox, { timeout: timeoutMs, filename: "workflow-code-step", breakOnSigint: true, microtaskMode: "afterEvaluate" });
return { success: true, output: { result: sandbox.output ?? result, logs } };
} catch (err: any) {
if (err.code === "ERR_SCRIPT_EXECUTION_TIMEOUT") return { success: false, error: `Code execution timed out after ${timeoutMs}ms` };
return { success: false, error: `Code error: ${err.message}` };
}
}
// P1 FIX: Blocked Python modules that enable filesystem/network/process access
const BLOCKED_PYTHON_MODULES = [
"os", "subprocess", "shutil", "sys", "pathlib", "glob", "tempfile",
"socket", "http", "urllib", "requests", "httpx", "aiohttp",
"ctypes", "multiprocessing", "threading", "signal",
"importlib", "pkgutil", "code", "codeop", "compile", "compileall",
"pickle", "shelve", "marshal",
"builtins", "__builtin__", "io",
"webbrowser", "ftplib", "smtplib", "telnetlib",
];
async function executePythonCode(code: string, ctx: TemplateContext): Promise<StepResult> {
// P1 FIX: Block dangerous imports at the source level before execution
const importPattern = /(?:^|\n)\s*(?:import|from)\s+(\w+)/g;
let match: RegExpExecArray | null;
while ((match = importPattern.exec(code)) !== null) {
const mod = match[1];
if (BLOCKED_PYTHON_MODULES.includes(mod)) {
return { success: false, error: `Blocked import: '${mod}' is not allowed in workflow code steps` };
}
}
// Also block __import__, exec(), eval(), open(), compile() and introspection
const dangerousCalls = [
/__import__/, /\bexec\s*\(/, /\beval\s*\(/, /\bopen\s*\(/, /\bcompile\s*\(/,
/__loader__/, /__spec__/, /__subclasses__/, /__bases__/, /__mro__/, /__class__/,
/__getattribute__/, /__dict__/,
/globals\s*\(\)/, /locals\s*\(\)/, /vars\s*\(\)/, /dir\s*\(\)/,
/getattr\s*\(/, /setattr\s*\(/, /delattr\s*\(/, /hasattr\s*\(/,
/type\s*\(/,
];
for (const pattern of dangerousCalls) {
if (pattern.test(code)) {
return { success: false, error: `Blocked function call: '${pattern.source}' is not allowed in workflow code steps` };
}
}
const tmpFile = join(tmpdir(), `wf-py-${randomUUID()}.py`);
const ctxJson = JSON.stringify({ steps: ctx.steps, trigger: ctx.trigger, input: ctx.input });
// P1 FIX: Sandboxed wrapper that restricts builtins, cleans module dunders, blocks chr/ord bypass
const wrappedCode = `import json, sys, math, re, datetime, collections, itertools, functools, hashlib, hmac, base64, uuid, copy, decimal, fractions, statistics, string, textwrap, unicodedata, operator, enum, dataclasses, typing, numbers
def _clean_module(mod):
d = {}
for k in dir(mod):
if not k.startswith('_'):
d[k] = getattr(mod, k)
return type('CleanModule', (), d)()
_blocked = {'open', 'exec', 'eval', 'compile', '__import__', 'exit', 'quit', 'breakpoint', 'input', 'help', 'memoryview', 'type', 'hasattr', 'getattr', 'setattr', 'delattr', 'chr', 'ord'}
_safe_builtins = {k: v for k, v in __builtins__.__dict__.items() if k not in _blocked and not k.startswith('_')} if hasattr(__builtins__, '__dict__') else {}
_safe_builtins.update({'True': True, 'False': False, 'None': None, 'print': print, 'len': len, 'range': range, 'int': int, 'float': float, 'str': str, 'bool': bool, 'list': list, 'dict': dict, 'tuple': tuple, 'set': set, 'frozenset': frozenset, 'sorted': sorted, 'reversed': reversed, 'enumerate': enumerate, 'zip': zip, 'map': map, 'filter': filter, 'any': any, 'all': all, 'sum': sum, 'min': min, 'max': max, 'abs': abs, 'round': round, 'isinstance': isinstance, 'issubclass': issubclass, 'repr': repr, 'hash': hash, 'id': id, 'hex': hex, 'oct': oct, 'bin': bin, 'format': format, 'bytes': bytes, 'bytearray': bytearray})
_ctx = json.loads(${JSON.stringify(ctxJson)})
_safe_sys = type('SafeSys', (), {'maxsize': sys.maxsize, 'version': sys.version, 'platform': sys.platform, 'float_info': sys.float_info, 'int_info': sys.int_info})()
_sandbox_globals = {"__builtins__": _safe_builtins, "json": _clean_module(json), "sys": _safe_sys, "math": _clean_module(math), "re": _clean_module(re), "datetime": _clean_module(datetime), "collections": _clean_module(collections), "itertools": _clean_module(itertools), "functools": _clean_module(functools), "hashlib": _clean_module(hashlib), "hmac": _clean_module(hmac), "base64": _clean_module(base64), "uuid": _clean_module(uuid), "copy": _clean_module(copy), "decimal": _clean_module(decimal), "fractions": _clean_module(fractions), "statistics": _clean_module(statistics), "string": _clean_module(string), "textwrap": _clean_module(textwrap), "unicodedata": _clean_module(unicodedata), "operator": _clean_module(operator), "enum": _clean_module(enum), "dataclasses": _clean_module(dataclasses), "typing": _clean_module(typing), "numbers": _clean_module(numbers), "steps": _ctx.get('steps', {}), "trigger": _ctx.get('trigger', {}), "input_data": _ctx.get('input', None), "output": None}
_user_code = ${JSON.stringify(code)}
exec(_user_code, _sandbox_globals)
if _sandbox_globals.get("output") is not None:
print("__OUTPUT__" + json.dumps(_sandbox_globals["output"]))
`;
try {
await writeFile(tmpFile, wrappedCode, "utf8");
const result = await new Promise<StepResult>((resolve) => {
execFile("python3", [tmpFile], {
timeout: CODE_TIMEOUT_MS,
maxBuffer: CODE_OUTPUT_MAX,
// P1 FIX: Restrict environment — no PATH manipulation, no HOME access
env: { PATH: "/usr/bin:/usr/local/bin", PYTHONDONTWRITEBYTECODE: "1", PYTHONIOENCODING: "utf-8" },
}, (err, stdout, stderr) => {
if (err) {
if ((err as any).killed) {
resolve({ success: false, error: `Python execution timed out after ${CODE_TIMEOUT_MS}ms` });
} else {
resolve({ success: false, error: `Python error: ${stderr || err.message}` });
}
return;
}
const lines = stdout.split("\n");
const outputLine = lines.find(l => l.startsWith("__OUTPUT__"));
let output: unknown = null;
if (outputLine) {
try { output = JSON.parse(outputLine.slice("__OUTPUT__".length)); } catch { /* code step output wasn't valid JSON — treat as raw text */ }
}
const logs = lines.filter(l => !l.startsWith("__OUTPUT__") && l.trim());
resolve({ success: true, output: { result: output, logs, stderr: stderr || undefined } });
});
});
return result;
} finally {
unlink(tmpFile).catch(() => {});
}
}
// ============================================================================
// CIRCUIT BREAKER (per user_tool)
// ============================================================================
async function checkToolCircuitBreaker(
supabase: SupabaseClient, toolId: string,
): Promise<{ allowed: boolean; reason?: string }> {
const { data } = await supabase.from("user_tools")
.select("circuit_breaker_state, circuit_breaker_tripped_at, circuit_breaker_cooldown_seconds")
.eq("id", toolId).single();
if (!data) return { allowed: true };
if (data.circuit_breaker_state === "open") {
const trippedAt = new Date(data.circuit_breaker_tripped_at).getTime();
const cooldownMs = (data.circuit_breaker_cooldown_seconds || 300) * 1000;
if (Date.now() < trippedAt + cooldownMs) {
return { allowed: false, reason: `Circuit breaker open (cooldown until ${new Date(trippedAt + cooldownMs).toISOString()})` };
}
await supabase.from("user_tools").update({ circuit_breaker_state: "half_open" }).eq("id", toolId);
}
return { allowed: true };
}
async function updateToolCircuitBreaker(
supabase: SupabaseClient, toolId: string, success: boolean,
): Promise<void> {
if (success) {
await supabase.from("user_tools").update({ circuit_breaker_state: "closed", circuit_breaker_failures: 0 }).eq("id", toolId);
return;
}
const { data } = await supabase.from("user_tools")
.select("circuit_breaker_failures, circuit_breaker_threshold").eq("id", toolId).single();
if (!data) return;
const newFailures = (data.circuit_breaker_failures || 0) + 1;
if (newFailures >= (data.circuit_breaker_threshold || 5)) {
await supabase.from("user_tools").update({
circuit_breaker_state: "open", circuit_breaker_failures: newFailures,
circuit_breaker_tripped_at: new Date().toISOString(),
}).eq("id", toolId);
await supabase.from("audit_logs").insert({
action: "workflow.circuit_breaker.tripped", severity: "warning",
resource_type: "user_tool", resource_id: toolId, source: "workflow_engine",
details: { failures: newFailures, threshold: data.circuit_breaker_threshold },
}).then(() => {});
} else {
await supabase.from("user_tools").update({ circuit_breaker_failures: newFailures }).eq("id", toolId);
}
}
async function handleWorkflowCircuitBreaker(
supabase: SupabaseClient, workflowId: string, success: boolean,
): Promise<void> {
if (success) {
await supabase.from("workflows").update({ circuit_breaker_state: "closed", circuit_breaker_failures: 0 }).eq("id", workflowId);
return;
}
const { data } = await supabase.from("workflows")
.select("circuit_breaker_failures, circuit_breaker_threshold").eq("id", workflowId).single();
if (!data) return;
const newFailures = (data.circuit_breaker_failures || 0) + 1;
if (newFailures >= (data.circuit_breaker_threshold || 5)) {
await supabase.from("workflows").update({
circuit_breaker_state: "open", circuit_breaker_failures: newFailures,
circuit_breaker_tripped_at: new Date().toISOString(),
}).eq("id", workflowId);
await supabase.from("audit_logs").insert({
action: "workflow.circuit_breaker.tripped", severity: "warning",
resource_type: "workflow", resource_id: workflowId, source: "workflow_engine",
details: { failures: newFailures, threshold: data.circuit_breaker_threshold },
}).then(() => {});
} else {
await supabase.from("workflows").update({ circuit_breaker_failures: newFailures }).eq("id", workflowId);
}
}
// ============================================================================
// CORE ENGINE
// ============================================================================
export async function processWorkflowSteps(
supabase: SupabaseClient, batchSize: number = 10,
): Promise<{ processed: number; errors: number }> {
const { data: claimedRaw, error: claimErr } = await supabase.rpc("claim_pending_steps", {
batch_size: batchSize,
});
if (claimErr) {
console.error("[workflow] claim error:", claimErr.message);
return { processed: 0, errors: 1 };
}
const claimed: StepClaim[] = Array.isArray(claimedRaw) ? claimedRaw : [];
if (claimed.length === 0) return { processed: 0, errors: 0 };
console.log(`[workflow] Processing ${claimed.length} steps`);
let errors = 0;
// Batch-fetch trace_ids for all runs in this batch
const runIds = [...new Set(claimed.map(s => s.run_id))];
const { data: runTraces } = await supabase.from("workflow_runs")
.select("id, trace_id").in("id", runIds);
const traceMap = new Map((runTraces || []).map(r => [r.id, r.trace_id as string | undefined]));
// Partition: email tool steps from for_each need sequential processing with delays
// to avoid overwhelming Resend's 2 req/s rate limit
const isForEachEmailStep = (s: StepClaim) =>
s.parent_step_run_id && s.step_type === "tool" &&
String(s.step_config?.tool_name || "").includes("email");
const emailForEachSteps = claimed.filter(isForEachEmailStep);
const otherSteps = claimed.filter(s => !isForEachEmailStep(s));
const processStep = async (step: StepClaim) => {
try {
await applyVersionOverrides(supabase, step);
await executeAndAdvance(supabase, step, traceMap.get(step.run_id));
} catch (err) {
errors++;
console.error(`[workflow] Step ${step.step_key} error:`, sanitizeError(err));
await supabase.from("workflow_step_runs").update({
status: "failed", error_message: sanitizeError(err),
completed_at: new Date().toISOString(), duration_ms: 0,
}).eq("id", step.step_run_id);
}
};
// Process non-email steps in parallel (existing behavior)
await Promise.all(otherSteps.map(processStep));
// Process email for_each children sequentially with 550ms throttle
for (let i = 0; i < emailForEachSteps.length; i++) {
if (i > 0) await new Promise(r => setTimeout(r, 550));
await processStep(emailForEachSteps[i]);
}
return { processed: claimed.length, errors };
}
/**
* Check waiting steps: sub_workflow children completed, parallel/for_each children done.
* Called by the persistent worker loop alongside processWorkflowSteps.
*/
export async function processWaitingSteps(supabase: SupabaseClient): Promise<number> {
let resolved = 0;
// 0. Expire pending approvals (Phase 2)
try {
await supabase.rpc("expire_pending_approvals");
} catch (err) {
// Non-fatal — RPC may not exist yet if migration not applied
console.error("[workflow] expire_pending_approvals error:", sanitizeError(err));
}
// 1. Sub-workflow steps waiting for child runs to complete
const { data: subWfSteps } = await supabase
.from("workflow_step_runs")
.select("id, run_id, step_key, child_run_id, step_type")
.eq("status", "waiting")
.eq("step_type", "sub_workflow")
.not("child_run_id", "is", null)
.limit(50);
if (subWfSteps?.length) {
const childRunIds = subWfSteps.map(s => s.child_run_id).filter(Boolean) as string[];
const { data: childRuns } = await supabase
.from("workflow_runs")
.select("id, status, step_outputs, error_message")
.in("id", childRunIds)
.in("status", ["success", "failed"]);
if (childRuns?.length) {
const runMap = new Map(childRuns.map(r => [r.id, r]));
for (const step of subWfSteps) {
const childRun = runMap.get(step.child_run_id!);
if (!childRun) continue;
const success = childRun.status === "success";
await supabase.from("workflow_step_runs").update({
status: success ? "success" : "failed",
output: childRun.step_outputs,
error_message: success ? null : childRun.error_message,
completed_at: new Date().toISOString(),
}).eq("id", step.id);
// Accumulate output + advance
await accumulateAndAdvance(supabase, step.id, step.run_id, step.step_key,
success, childRun.step_outputs, childRun.error_message);
resolved++;
}
}
}
// 2. Parallel / for_each steps waiting for all children to complete
const { data: waitingParents } = await supabase
.from("workflow_step_runs")
.select("id, run_id, step_key, step_type, output")
.eq("status", "waiting")
.in("step_type", ["parallel", "for_each"])
.limit(50);
if (waitingParents?.length) {
for (const parent of waitingParents) {
// Count children and completed children
const { count: totalChildren } = await supabase
.from("workflow_step_runs")
.select("id", { count: "exact", head: true })
.eq("parent_step_run_id", parent.id);
const { count: doneChildren } = await supabase
.from("workflow_step_runs")
.select("id", { count: "exact", head: true })
.eq("parent_step_run_id", parent.id)
.in("status", ["success", "failed", "skipped", "cancelled"]);
if (totalChildren && doneChildren && doneChildren >= totalChildren) {
// All children done — check for failures
const { data: failedKids } = await supabase
.from("workflow_step_runs")
.select("step_key, error_message")
.eq("parent_step_run_id", parent.id)
.eq("status", "failed")
.limit(1);
// Collect child outputs
const { data: childOutputs } = await supabase
.from("workflow_step_runs")
.select("step_key, output, status")
.eq("parent_step_run_id", parent.id)
.order("created_at", { ascending: true });
const outputs = (childOutputs || []).map(c => c.output);
const allSuccess = !failedKids?.length;
await supabase.from("workflow_step_runs").update({
status: allSuccess ? "success" : "failed",
output: { children: outputs, total: totalChildren, failed: failedKids?.length || 0 },
error_message: allSuccess ? null : failedKids?.[0]?.error_message,
completed_at: new Date().toISOString(),
}).eq("id", parent.id);
await accumulateAndAdvance(supabase, parent.id, parent.run_id, parent.step_key,
allSuccess, { children: outputs }, allSuccess ? null : failedKids?.[0]?.error_message);
resolved++;
}
}
}
return resolved;
}
/** Helper: after a waiting step resolves, accumulate output and advance the workflow. */
async function accumulateAndAdvance(
supabase: SupabaseClient, stepRunId: string, runId: string, stepKey: string,
success: boolean, output: unknown, errorMessage?: string | null,
): Promise<void> {
// Load run to get current step_outputs + workflow_id + on_success/on_failure
const { data: run } = await supabase.from("workflow_runs")
.select("workflow_id, step_outputs, store_id").eq("id", runId).single();
if (!run) return;
const { data: stepDef } = await supabase.from("workflow_step_runs")
.select("step_id, step_key").eq("id", stepRunId).single();
if (!stepDef) return;
// Phase 4: Try versioned step def first, fall back to live table
let wsDef: { on_success?: string | null; on_failure?: string | null; max_retries?: number } | null = null;
const versionedSteps = await loadVersionedSteps(supabase, runId);
if (versionedSteps) {
const vStep = versionedSteps.find((s: any) => s.step_key === stepKey);
if (vStep) wsDef = { on_success: vStep.on_success, on_failure: vStep.on_failure, max_retries: vStep.max_retries };
}
if (!wsDef) {
const { data } = await supabase.from("workflow_steps")
.select("on_success, on_failure, max_retries").eq("id", stepDef.step_id).single();
wsDef = data;
}
const stepOutput = { output, status: success ? "success" : "failed" };
const newOutputs = { ...(run.step_outputs || {}), [stepKey]: stepOutput };
await supabase.from("workflow_runs").update({ step_outputs: newOutputs }).eq("id", runId);
if (success) {
if (wsDef?.on_success) {
await createNextStepRunByKey(supabase, runId, run.workflow_id, wsDef.on_success);
} else {
await checkWorkflowCompletion(supabase, runId, run.workflow_id);
}
} else {
if (wsDef?.on_failure) {
await createNextStepRunByKey(supabase, runId, run.workflow_id, wsDef.on_failure);
} else {
await completeWorkflowRun(supabase, runId, run.workflow_id, run.store_id, "failed", errorMessage, stepKey);
}
}
}
export async function executeAndAdvance(supabase: SupabaseClient, step: StepClaim, traceId?: string): Promise<void> {
const startTime = Date.now();
// Event journal — step started
logWorkflowEvent(supabase, step.run_id, "step_started", {
step_key: step.step_key, step_type: step.step_type, attempt: step.attempt_count,
}, step.step_run_id);
// Step result caching — skip successful steps on retry (idempotent re-execution)
if (step.attempt_count > 1) {
const { data: prevRun } = await supabase.from("workflow_step_runs")
.select("status, output").eq("run_id", step.run_id).eq("step_key", step.step_key)
.eq("status", "success").neq("id", step.step_run_id).limit(1);
if (prevRun?.length) {
console.log(`[workflow] Step ${step.step_key} already succeeded — using cached result`);
await supabase.from("workflow_step_runs").update({
status: "success", output: prevRun[0].output,
completed_at: new Date().toISOString(), duration_ms: Date.now() - startTime,
}).eq("id", step.step_run_id);
logWorkflowEvent(supabase, step.run_id, "step_cached", { step_key: step.step_key }, step.step_run_id);
const nextStepKey = step.on_success;
if (!nextStepKey) {
await checkWorkflowCompletion(supabase, step.run_id, step.workflow_id);
} else {
await createNextStepRunByKey(supabase, step.run_id, step.workflow_id, nextStepKey);
}
return;
}
}
// Flow control — check concurrency/rate limits before execution
const flowCheck = await checkFlowControl(supabase, step);
if (!flowCheck.allowed) {
// Requeue step with short delay for flow control backoff
await supabase.from("workflow_step_runs").update({
status: "retrying",
next_retry_at: new Date(Date.now() + 2000).toISOString(), // retry in 2s
output: { flow_control: flowCheck.reason },
}).eq("id", step.step_run_id);
logWorkflowEvent(supabase, step.run_id, "step_throttled", { reason: flowCheck.reason }, step.step_run_id);
return;
}
// Build template context
const ctx: TemplateContext = {
steps: {},
trigger: step.trigger_payload || {},
input: step.input || undefined,
};
if (step.step_outputs && typeof step.step_outputs === "object") {
for (const [key, val] of Object.entries(step.step_outputs)) {
if (val && typeof val === "object") {
ctx.steps[key] = val as { output?: unknown; status?: string; duration_ms?: number };
}
}
}
let result: StepResult;
// Enforce step-level timeout from step config (default 30s)
const stepTimeoutSec = (step.step_config.timeout_seconds as number) || 30;
const stepTimeoutMs = stepTimeoutSec * 1000;
let stepTimer: ReturnType<typeof setTimeout> | undefined;
const stepTimeoutPromise = new Promise<never>((_, reject) => {
stepTimer = setTimeout(() => reject(new Error(`Step timed out after ${stepTimeoutSec}s`)), stepTimeoutMs);
});
try {
switch (step.step_type) {
case "tool":
result = await Promise.race([executeToolStep(supabase, step.step_config, ctx, step.store_id, traceId), stepTimeoutPromise]);
break;
case "condition":
result = executeConditionStep(step.step_config, ctx);
break;
case "transform":
result = executeTransformStep(step.step_config, ctx);
break;
case "delay": {
// First attempt: set the delay. Second attempt (after delay): success.
if (step.attempt_count <= 1) {
const delaySec = (step.step_config.seconds as number) || 60;
await supabase.from("workflow_step_runs").update({
status: "retrying",
output: { delay_seconds: delaySec, resume_at: new Date(Date.now() + delaySec * 1000).toISOString() },
next_retry_at: new Date(Date.now() + delaySec * 1000).toISOString(),
}).eq("id", step.step_run_id);
clearTimeout(stepTimer);
return; // Worker picks it up after delay
}
result = { success: true, output: { delayed: true, seconds: step.step_config.seconds } };
break;
}
case "agent":
result = await Promise.race([executeAgentStep(step.step_config, ctx, step.store_id, supabase, step, traceId), stepTimeoutPromise]);
break;
case "sub_workflow": {
const childWfId = resolveTemplate((step.step_config.workflow_id || "") as string, ctx) as string;
if (!childWfId) { result = { success: false, error: "No workflow_id in sub_workflow config" }; break; }
const payloadTemplate = (step.step_config.trigger_payload_template || step.step_config.trigger_payload || {}) as Record<string, unknown>;
const payload = resolveTemplate(payloadTemplate, ctx);
const { data: startResult } = await supabase.rpc("start_workflow_run", {
p_workflow_id: childWfId,
p_store_id: step.store_id,
p_trigger_type: "sub_workflow",
p_trigger_payload: payload,
});
if (!startResult?.success) {
result = { success: false, error: startResult?.error || "Failed to start sub-workflow" };
break;
}
// Set to waiting — processWaitingSteps will resolve when child completes
await supabase.from("workflow_step_runs").update({
status: "waiting",
child_run_id: startResult.run_id,
output: { child_run_id: startResult.run_id, child_workflow_id: childWfId },
}).eq("id", step.step_run_id);
clearTimeout(stepTimer);
return;
}
case "parallel": {
const stepKeys = (step.step_config.step_keys || step.step_config.child_steps || []) as string[];
if (stepKeys.length === 0) { result = { success: true, output: { parallel: true, steps: [] } }; break; }
const { data: steps } = await supabase.from("workflow_steps")
.select("id, step_key, step_type, max_retries")
.eq("workflow_id", step.workflow_id).in("step_key", stepKeys);
if (steps?.length) {
await supabase.from("workflow_step_runs").insert(
steps.map(s => ({
run_id: step.run_id, step_id: s.id, step_key: s.step_key,
step_type: s.step_type, status: "pending" as const,
max_attempts: s.max_retries ?? 3, parent_step_run_id: step.step_run_id,
}))
);
}
await supabase.from("workflow_step_runs").update({
status: "waiting", output: { waiting_for: stepKeys },
}).eq("id", step.step_run_id);
clearTimeout(stepTimer);
return; // processWaitingSteps resolves when all children complete
}
case "for_each": {
const itemsExpr = step.step_config.items as string;
const targetStepKey = step.step_config.step_key as string;
if (!itemsExpr || !targetStepKey) {
result = { success: false, error: "for_each requires items and step_key in config" }; break;
}
const items = resolveTemplate(itemsExpr, ctx);
if (!Array.isArray(items)) {
result = { success: false, error: `for_each items resolved to ${typeof items}, expected array` }; break;
}
if (items.length === 0) {
result = { success: true, output: { children: [], total: 0 } }; break;
}
// Look up target step definition
const { data: targetStep } = await supabase.from("workflow_steps")
.select("id, step_key, step_type, max_retries")
.eq("workflow_id", step.workflow_id).eq("step_key", targetStepKey).single();
if (!targetStep) {
result = { success: false, error: `for_each target step '${targetStepKey}' not found` }; break;
}
// Create a step_run per item with the item as input
await supabase.from("workflow_step_runs").insert(
items.map((item, idx) => ({
run_id: step.run_id, step_id: targetStep.id,
step_key: `${targetStepKey}[${idx}]`, step_type: targetStep.step_type,
status: "pending" as const, max_attempts: targetStep.max_retries ?? 3,
parent_step_run_id: step.step_run_id, input: item,
}))
);
await supabase.from("workflow_step_runs").update({
status: "waiting", output: { waiting_for_count: items.length, target_step: targetStepKey },
}).eq("id", step.step_run_id);
clearTimeout(stepTimer);
return;
}
case "code": {
result = await Promise.race([executeCodeStepIsolated(step.step_config, ctx), stepTimeoutPromise]);
break;
}
case "webhook_out":
result = await Promise.race([executeWebhookOutStep(step.step_config, ctx), stepTimeoutPromise]);
break;
case "noop":
result = executeNoopStep();
break;
case "approval": {
const approvalResult = await executeApprovalStep(supabase, step, ctx);
if (approvalResult === "waiting") { clearTimeout(stepTimer); return; }
result = approvalResult;
break;
}
// Custom step — POSTs workflow context to a user-defined URL and uses the response
case "custom": {
const customUrl = resolveTemplate((step.step_config.url || step.step_config.endpoint) as string, ctx) as string;
if (!customUrl) { result = { success: false, error: "Custom step requires url in config" }; break; }
if (isBlockedUrl(customUrl)) { result = { success: false, error: "Custom step URL targets a blocked address" }; break; }
try {
const customHeaders: Record<string, string> = { "Content-Type": "application/json" };
if (step.step_config.headers && typeof step.step_config.headers === "object") {
for (const [k, v] of Object.entries(step.step_config.headers as Record<string, string>)) {
customHeaders[k] = resolveTemplate(v, ctx) as string;
}
}
const customBody = JSON.stringify({
step_key: step.step_key,
run_id: step.run_id,
workflow_id: step.workflow_id,
input: step.input,
step_outputs: step.step_outputs,
trigger_payload: step.trigger_payload,
config: step.step_config.payload_config || {},
});
const ctrl = new AbortController();
const timer = setTimeout(() => ctrl.abort(), 30_000);
const resp = await fetch(customUrl, { method: "POST", headers: customHeaders, body: customBody, signal: ctrl.signal });
clearTimeout(timer);
const respData = resp.headers.get("content-type")?.includes("json")
? await resp.json() : await resp.text();
if (!resp.ok) {
result = { success: false, error: `Custom step HTTP ${resp.status}: ${String(respData).substring(0, 500)}` };
} else {
// Support branch routing from custom step response
const branch = typeof respData === "object" && respData?.branch ? respData.branch as string : undefined;
result = { success: true, output: respData, branch };
}
} catch (err: any) {
result = { success: false, error: err.name === "AbortError" ? "Custom step timed out" : sanitizeError(err) };
}
break;
}
// Waitpoint — generalized wait-for-external-signal (subsumes approval, webhook callback, cross-workflow)
case "waitpoint": {
// Second pass — resumed with completion data
if (step.input && typeof step.input === "object" && (step.input as any).waitpoint_completed) {
result = { success: true, output: (step.input as any).waitpoint_data || {} };
break;
}
// First pass — create waitpoint token and pause
const waitpointToken = randomUUID();
const waitpointTimeout = (step.step_config.timeout_seconds as number) || 86400;
const waitpointExpires = new Date(Date.now() + waitpointTimeout * 1000).toISOString();
await supabase.from("waitpoint_tokens").insert({
token: waitpointToken,
run_id: step.run_id,
step_run_id: step.step_run_id,
store_id: step.store_id,
expires_at: waitpointExpires,
label: (step.step_config.label as string) || step.step_key,
});
await supabase.from("workflow_step_runs").update({
status: "waiting",
output: { waiting_for: "waitpoint", token: waitpointToken, expires_at: waitpointExpires },
}).eq("id", step.step_run_id);
logWorkflowEvent(supabase, step.run_id, "waitpoint_created", { token: waitpointToken }, step.step_run_id);
clearTimeout(stepTimer);
return;
}
default:
result = { success: false, error: `Unknown step type: ${step.step_type}` };
}
} catch (timeoutErr: any) {
result = { success: false, error: timeoutErr.message || `Step timed out after ${stepTimeoutSec}s` };
} finally {
clearTimeout(stepTimer);
}
const durationMs = Date.now() - startTime;
// Event journal — step completed
logWorkflowEvent(supabase, step.run_id, result.success ? "step_completed" : "step_failed", {
step_key: step.step_key, duration_ms: durationMs,
...(result.error ? { error: result.error } : {}),
...(result.branch ? { branch: result.branch } : {}),
}, step.step_run_id);
// Persist step result
await supabase.from("workflow_step_runs").update({
status: result.success ? "success" : "failed",
output: result.output || null,
error_message: result.error || null,
completed_at: new Date().toISOString(),
duration_ms: durationMs,
}).eq("id", step.step_run_id);
// P4 FIX: Atomically merge step output using jsonb_set to prevent race conditions
// Two concurrent steps can no longer overwrite each other's outputs
const stepOutput = { output: result.output, status: result.success ? "success" : "failed", duration_ms: durationMs };
await supabase.rpc("accumulate_step_output", {
p_run_id: step.run_id,
p_step_key: step.step_key,
p_step_output: stepOutput,
}).then(async ({ error }) => {
if (error) {
// Fallback to non-atomic update if RPC not available.
// IMPORTANT: Read step_outputs fresh — step.step_outputs is stale (loaded at claim time)
// and parallel steps may have written since then. The RPC path is preferred (truly atomic).
console.error("[workflow] accumulate_step_output RPC error, using fallback:", error.message);
const { data: currentRun } = await supabase
.from("workflow_runs")
.select("step_outputs")
.eq("id", step.run_id)
.single();
const merged = { ...(currentRun?.step_outputs || {}), [step.step_key]: stepOutput };
return supabase.from("workflow_runs").update({
step_outputs: merged,
}).eq("id", step.run_id);
}
});
// Checkpoint — snapshot state after each step for replay/debugging
if (result.success) {
supabase.from("workflow_checkpoints").insert({
run_id: step.run_id, step_run_id: step.step_run_id, step_key: step.step_key,
step_outputs: { ...(step.step_outputs || {}), [step.step_key]: { output: result.output, status: "success", duration_ms: durationMs } },
trigger_payload: step.trigger_payload,
sequence_number: Object.keys(step.step_outputs || {}).length + 1,
}).then(() => {}); // fire-and-forget
}
// Audit
await supabase.from("audit_logs").insert({
action: `workflow.step.${result.success ? "completed" : "failed"}`,
severity: result.success ? "info" : "error",
store_id: step.store_id, resource_type: "workflow_step_run",
resource_id: step.step_run_id, source: "workflow_engine", duration_ms: durationMs,
request_id: traceId || null,
details: { workflow_id: step.workflow_id, run_id: step.run_id, step_key: step.step_key, step_type: step.step_type, attempt: step.attempt_count },
error_message: result.error || null,
}).then(() => {});
// Child steps (parallel/for_each) — just save result, parent handles advancement
if (step.parent_step_run_id) {
// If child failed and has retries left, retry it (respects retry_policy)
if (!result.success && step.attempt_count < step.max_attempts) {
const retryPolicy = step.step_config.retry_policy as { backoff_type?: string; backoff_base_seconds?: number; max_backoff_seconds?: number } | undefined;
const backoffType = retryPolicy?.backoff_type || "exponential";
const baseDelay = retryPolicy?.backoff_base_seconds || step.retry_delay_seconds || 10;
const maxBackoff = retryPolicy?.max_backoff_seconds || 300;
let backoffDelay: number;
switch (backoffType) {
case "fixed": backoffDelay = baseDelay; break;
case "linear": backoffDelay = baseDelay * step.attempt_count; break;
default: backoffDelay = baseDelay * Math.pow(2, step.attempt_count - 1); break;
}
backoffDelay = Math.min(backoffDelay, maxBackoff);
await supabase.from("workflow_step_runs").update({
status: "retrying",
next_retry_at: new Date(Date.now() + backoffDelay * 1000).toISOString(),
}).eq("id", step.step_run_id);
}
return; // Parent's processWaitingSteps handles advancement
}
// Handle failure — configurable retry policy
if (!result.success) {
if (step.attempt_count < step.max_attempts) {
// Check retry_on filter — only retry if error matches pattern (if configured)
const retryPolicy = step.step_config.retry_policy as { backoff_type?: string; backoff_base_seconds?: number; max_backoff_seconds?: number; retry_on?: string[] } | undefined;
const retryOn = retryPolicy?.retry_on;
const shouldRetry = !retryOn?.length || retryOn.some(pattern => result.error?.includes(pattern));
if (shouldRetry) {
const backoffType = retryPolicy?.backoff_type || "exponential";
const baseDelay = retryPolicy?.backoff_base_seconds || step.retry_delay_seconds || 10;
const maxBackoff = retryPolicy?.max_backoff_seconds || 300; // 5 min cap
let backoffDelay: number;
switch (backoffType) {
case "fixed": backoffDelay = baseDelay; break;
case "linear": backoffDelay = baseDelay * step.attempt_count; break;
default: backoffDelay = baseDelay * Math.pow(2, step.attempt_count - 1); break; // exponential
}
backoffDelay = Math.min(backoffDelay, maxBackoff);
await supabase.from("workflow_step_runs").update({
status: "retrying",
next_retry_at: new Date(Date.now() + backoffDelay * 1000).toISOString(),
}).eq("id", step.step_run_id);
logWorkflowEvent(supabase, step.run_id, "step_retrying", {
step_key: step.step_key, attempt: step.attempt_count, backoff_type: backoffType, delay_seconds: backoffDelay,
}, step.step_run_id);
return;
}
// retry_on filter didn't match — fall through to failure handling
}
if (step.on_failure) {
await createNextStepRunByKey(supabase, step.run_id, step.workflow_id, step.on_failure);
} else {
await completeWorkflowRun(supabase, step.run_id, step.workflow_id, step.store_id, "failed", result.error, step.step_key);
}
return;
}
// Advance — condition steps use branch, otherwise on_success
const nextStepKey = result.branch || step.on_success;
if (!nextStepKey) {
await checkWorkflowCompletion(supabase, step.run_id, step.workflow_id);
return;
}
await createNextStepRunByKey(supabase, step.run_id, step.workflow_id, nextStepKey);
}
// ============================================================================
// WORKFLOW ADVANCEMENT HELPERS
// ============================================================================
/**
* Load the versioned steps array for a run. Returns null if no version.
*/
async function loadVersionedSteps(
supabase: SupabaseClient, runId: string,
): Promise<any[] | null> {
const { data: run } = await supabase.from("workflow_runs")
.select("version_id").eq("id", runId).single();
if (!run?.version_id) return null;
const { data: version } = await supabase.from("workflow_versions")
.select("steps").eq("id", run.version_id).single();
if (version?.steps && Array.isArray(version.steps)) return version.steps as any[];
return null;
}
/**
* Apply versioned overrides to a claimed step. If the run has a version_id,
* replaces step_config, on_success, on_failure with values from the snapshot.
*/
async function applyVersionOverrides(supabase: SupabaseClient, step: StepClaim): Promise<void> {
const versionedSteps = await loadVersionedSteps(supabase, step.run_id);
if (!versionedSteps) return;
const vStep = versionedSteps.find((s: any) => s.step_key === step.step_key);
if (vStep) {
step.step_config = vStep.step_config || step.step_config;
step.on_success = vStep.on_success ?? step.on_success;
step.on_failure = vStep.on_failure ?? step.on_failure;
}
}
/**
* Resolve a step definition by key. If the run has a version_id, load from
* the versioned snapshot. Otherwise, load from the live workflow_steps table.
*/
async function resolveStepDef(
supabase: SupabaseClient, runId: string, workflowId: string, stepKey: string,
): Promise<{ id: string; step_key: string; step_type: string; max_retries: number } | null> {
const versionedSteps = await loadVersionedSteps(supabase, runId);
if (versionedSteps) {
const step = versionedSteps.find((s: any) => s.step_key === stepKey);
if (step) {
return { id: step.id, step_key: step.step_key, step_type: step.step_type, max_retries: step.max_retries ?? 3 };
}
return null;
}
// Live table
const { data } = await supabase.from("workflow_steps")
.select("id, step_key, step_type, max_retries")
.eq("workflow_id", workflowId).eq("step_key", stepKey).single();
return data;
}
async function createNextStepRunByKey(
supabase: SupabaseClient, runId: string, workflowId: string, stepKey: string,
): Promise<string | null> {
const nextStep = await resolveStepDef(supabase, runId, workflowId, stepKey);
if (!nextStep) {
console.error(`[workflow] Step '${stepKey}' not found in workflow ${workflowId}`);
const { data: run } = await supabase.from("workflow_runs").select("store_id").eq("id", runId).single();
await completeWorkflowRun(supabase, runId, workflowId, run?.store_id, "failed", `Step '${stepKey}' not found`);
return null;
}
// Check step count limit
const { data: run } = await supabase.from("workflow_runs")
.select("store_id").eq("id", runId).single();
const { count } = await supabase.from("workflow_step_runs")
.select("id", { count: "exact", head: true }).eq("run_id", runId);
const { data: wf } = await supabase.from("workflows")
.select("max_steps_per_run").eq("id", workflowId).single();
if ((count || 0) >= (wf?.max_steps_per_run || 50)) {
await completeWorkflowRun(supabase, runId, workflowId, run?.store_id, "failed", `Step limit exceeded (${wf?.max_steps_per_run || 50})`);
return null;
}
const { data: inserted } = await supabase.from("workflow_step_runs").insert({
run_id: runId, step_id: nextStep.id, step_key: nextStep.step_key,
step_type: nextStep.step_type, status: "pending", max_attempts: nextStep.max_retries ?? 3,
}).select("id").single();
return inserted?.id || null;
}
// ============================================================================
// PHASE 1: INLINE EXECUTION — execute steps immediately, no 5s wait
// ============================================================================
/**
* Claim a single pending step for a specific run using direct query + atomic UPDATE.
* H1 FIX: Avoids using global claim_pending_steps RPC which can steal from other runs.
*/
async function claimStepForRun(
supabase: SupabaseClient, runId: string,
): Promise<StepClaim | null> {
// Find the first pending step for this specific run
const { data: pending } = await supabase.from("workflow_step_runs")
.select("id")
.eq("run_id", runId)
.eq("status", "pending")
.order("created_at", { ascending: true })
.limit(1);
if (!pending?.length) return null;
// Atomically claim it — if another process claimed it, this returns 0 rows
const { data: claimed } = await supabase.from("workflow_step_runs")
.update({ status: "running", started_at: new Date().toISOString() })
.eq("id", pending[0].id)
.eq("status", "pending") // atomic guard
.select("id")
.single();
if (!claimed) {
// P1 FIX: Log when inline claim loses race to the worker loop.
// The step will still execute via the worker — this just makes the race visible for debugging.
console.log(`[workflow] Inline claim lost race for step_run ${pending[0].id} in run ${runId} — worker will execute`);
return null;
}
// Now load the full step data via the RPC pattern (join with workflow_runs + workflow_steps)
const { data: stepData } = await supabase.from("workflow_step_runs")
.select(`
id, run_id, step_id, step_key, step_type, input, attempt_count, max_attempts, parent_step_run_id,
workflow_runs!workflow_step_runs_run_id_fkey!inner(workflow_id, store_id, step_outputs, trigger_payload),
workflow_steps!inner(step_config, on_success, on_failure, timeout_seconds, input_schema, retry_delay_seconds)
`)
.eq("id", claimed.id)
.single();
if (!stepData) return null;
const run = (stepData as any).workflow_runs;
const ws = (stepData as any).workflow_steps;
// Get max_steps_per_run from workflow
const { data: wf } = await supabase.from("workflows")
.select("max_steps_per_run")
.eq("id", run.workflow_id)
.single();
return {
step_run_id: stepData.id,
run_id: stepData.run_id,
workflow_id: run.workflow_id,
store_id: run.store_id,
step_id: stepData.step_id,
step_key: stepData.step_key,
step_type: stepData.step_type,
step_config: ws.step_config || {},
on_success: ws.on_success,
on_failure: ws.on_failure,
timeout_seconds: ws.timeout_seconds || 60,
input_schema: ws.input_schema,
step_outputs: run.step_outputs || {},
trigger_payload: run.trigger_payload || {},
attempt_count: stepData.attempt_count || 1,
max_attempts: stepData.max_attempts || 3,
max_steps_per_run: wf?.max_steps_per_run || 50,
input: stepData.input,
parent_step_run_id: stepData.parent_step_run_id,
retry_delay_seconds: ws.retry_delay_seconds || 10,
};
}
/**
* Execute the first pending step of a run inline, then chain subsequent steps.
* Depth guard prevents unbounded recursion — worker loop catches anything left.
*/
export async function executeInlineChain(
supabase: SupabaseClient, runId: string, depth: number = 0, traceId?: string,
): Promise<void> {
if (depth >= MAX_INLINE_DEPTH) {
console.log(`[workflow-inline] Depth limit ${MAX_INLINE_DEPTH} reached for run ${runId}, deferring to worker`);
return;
}
// Resolve traceId from run record if not passed (first depth only to avoid repeated queries)
if (!traceId && depth === 0) {
const { data: runData } = await supabase.from("workflow_runs")
.select("trace_id").eq("id", runId).single();
traceId = runData?.trace_id || undefined;
}
// H1 FIX: Claim step specifically for this run — cannot steal from other runs
const step = await claimStepForRun(supabase, runId);
if (!step) return; // No pending steps for this run
// Phase 4: Override step_config/on_success/on_failure from version snapshot
await applyVersionOverrides(supabase, step);
try {
await executeAndAdvance(supabase, step, traceId);
} catch (err) {
console.error(`[workflow-inline] Step ${step.step_key} error:`, sanitizeError(err));
await supabase.from("workflow_step_runs").update({
status: "failed", error_message: sanitizeError(err),
completed_at: new Date().toISOString(), duration_ms: 0,
}).eq("id", step.step_run_id);
return;
}
// Steps that go async (delay, sub_workflow, parallel, for_each, approval) don't chain
const asyncTypes = new Set(["delay", "sub_workflow", "parallel", "for_each", "approval", "waitpoint"]);
if (asyncTypes.has(step.step_type)) return;
// Chain to next step
await executeInlineChain(supabase, runId, depth + 1, traceId);
}
async function checkWorkflowCompletion(
supabase: SupabaseClient, runId: string, workflowId: string,
): Promise<void> {
const { data: pending } = await supabase.from("workflow_step_runs")
.select("id").eq("run_id", runId)
.in("status", ["pending", "running", "retrying", "waiting"]).limit(1);
if (pending?.length) return; // Still in progress
// Get failed steps, but only count them as terminal failures if they weren't handled
// (i.e., no on_failure branch was taken — if on_failure exists, the failure was routed)
const { data: failedSteps } = await supabase.from("workflow_step_runs")
.select("step_key, step_id, error_message").eq("run_id", runId).eq("status", "failed");
// Filter to unhandled failures — steps where on_failure is null (no error handler)
// M14 FIX: Use versioned step defs when available
let failed: typeof failedSteps = [];
if (failedSteps?.length) {
const versionedSteps = await loadVersionedSteps(supabase, runId);
if (versionedSteps) {
// Use versioned definitions
const vStepMap = new Map(versionedSteps.map((s: any) => [s.step_key, s]));
failed = failedSteps.filter(s => {
const vStep = vStepMap.get(s.step_key);
return !vStep?.on_failure;
});
} else {
// Fall back to live table
const stepIds = failedSteps.map(s => s.step_id).filter(Boolean);
const { data: stepDefs } = await supabase.from("workflow_steps")
.select("id, on_failure").in("id", stepIds);
const defMap = new Map((stepDefs || []).map(d => [d.id, d]));
failed = failedSteps.filter(s => {
const def = defMap.get(s.step_id);
return !def?.on_failure;
});
}
}
const { data: run } = await supabase.from("workflow_runs").select("store_id").eq("id", runId).single();
if (failed?.length) {
await completeWorkflowRun(supabase, runId, workflowId, run?.store_id, "failed", failed[0].error_message, failed[0].step_key);
} else {
await completeWorkflowRun(supabase, runId, workflowId, run?.store_id, "success");
}
}
export async function completeWorkflowRun(
supabase: SupabaseClient, runId: string, workflowId: string, storeId: string | undefined,
status: "success" | "failed" | "cancelled" | "timed_out",
errorMessage?: string | null, errorStepKey?: string | null, traceId?: string,
): Promise<void> {
const { data: run } = await supabase.from("workflow_runs")
.select("started_at, trace_id").eq("id", runId).single();
const durationMs = run?.started_at ? Date.now() - new Date(run.started_at).getTime() : null;
const resolvedTraceId = traceId || run?.trace_id || null;
await supabase.from("workflow_runs").update({
status, error_message: errorMessage || null, error_step_key: errorStepKey || null,
completed_at: new Date().toISOString(), duration_ms: durationMs, current_step_key: null,
}).eq("id", runId);
// Cancel remaining pending steps
await supabase.from("workflow_step_runs").update({ status: "cancelled" })
.eq("run_id", runId).in("status", ["pending", "retrying", "waiting"]);
// Event journal — run completed
logWorkflowEvent(supabase, runId, `run_${status}`, {
workflow_id: workflowId, duration_ms: durationMs,
...(errorMessage ? { error: errorMessage } : {}),
});
// Circuit breaker
if (workflowId) await handleWorkflowCircuitBreaker(supabase, workflowId, status === "success");
// Audit
await supabase.from("audit_logs").insert({
action: `workflow.run.${status}`, severity: status === "success" ? "info" : "error",
store_id: storeId || null, resource_type: "workflow_run", resource_id: runId,
source: "workflow_engine", duration_ms: durationMs,
request_id: resolvedTraceId,
details: { workflow_id: workflowId, run_id: runId },
error_message: errorMessage || null,
}).then(() => {});
// Error notifications
if (status === "failed" && workflowId) {
await sendErrorNotification(supabase, workflowId, runId, storeId, errorMessage, errorStepKey);
// Auto-archive to Dead Letter Queue
if (storeId) {
archiveToDlq(supabase, runId, workflowId, storeId).catch(e => console.warn('[workflow] archiveToDlq failed:', e.message));
}
}
}
async function sendErrorNotification(
supabase: SupabaseClient, workflowId: string, runId: string,
storeId: string | undefined, errorMessage?: string | null, errorStepKey?: string | null,
): Promise<void> {
const { data: wf } = await supabase.from("workflows")
.select("name, on_error_webhook_url, on_error_email").eq("id", workflowId).single();
if (!wf) return;
const errorPayload = {
event: "workflow.run.failed",
workflow_id: workflowId, workflow_name: wf.name,
run_id: runId, error_message: errorMessage, error_step: errorStepKey,
timestamp: new Date().toISOString(),
};
// Webhook notification (with SSRF protection)
if (wf.on_error_webhook_url && !isBlockedUrl(wf.on_error_webhook_url)) {
try {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), 10_000);
await fetch(wf.on_error_webhook_url, {
method: "POST", headers: { "Content-Type": "application/json" },
body: JSON.stringify(errorPayload), signal: controller.signal,
});
clearTimeout(timer);
} catch (err) {
console.error("[workflow] Error notification webhook failed:", sanitizeError(err));
}
}
// Email notification
if (wf.on_error_email && _executeTool && storeId) {
try {
await _executeTool(supabase, "email", {
action: "send", to: wf.on_error_email,
subject: `Workflow "${wf.name}" failed`,
text: `Workflow "${wf.name}" failed at step "${errorStepKey || "unknown"}".\n\nError: ${errorMessage || "Unknown error"}\n\nRun ID: ${runId}\nTime: ${new Date().toISOString()}`,
}, storeId);
} catch (err) {
console.error("[workflow] Error notification email failed:", sanitizeError(err));
}
}
}
// ============================================================================
// WEBHOOK INGESTION
// ============================================================================
export async function handleWebhookIngestion(
supabase: SupabaseClient, slug: string, rawBody: string, headers: Record<string, string>,
): Promise<{ status: number; body: Record<string, unknown> }> {
// H6-audit: Use .limit(1) instead of .single() to handle duplicate slugs across stores
const { data: endpoints } = await supabase.from("webhook_endpoints")
.select("*").eq("slug", slug).eq("is_active", true).limit(1);
const endpoint = endpoints?.[0];
if (!endpoint) return { status: 404, body: { error: "Webhook endpoint not found" } };
// Rate limit
const oneMinAgo = new Date(Date.now() - 60_000).toISOString();
const { count: recentCount } = await supabase.from("audit_logs")
.select("id", { count: "exact", head: true })
.eq("resource_type", "webhook_endpoint").eq("resource_id", endpoint.id)
.gte("created_at", oneMinAgo);
if ((recentCount || 0) >= endpoint.max_requests_per_minute) {
return { status: 429, body: { error: "Rate limit exceeded" } };
}
// HMAC verification
if (endpoint.verify_signature) {
const signature = headers["x-webhook-signature"] || headers["x-hub-signature-256"] || "";
if (!signature) return { status: 401, body: { error: "Missing signature" } };
const expected = `sha256=${createHmac("sha256", endpoint.signing_secret).update(rawBody).digest("hex")}`;
try {
const sigBuf = Buffer.from(signature);
const expBuf = Buffer.from(expected);
if (sigBuf.length !== expBuf.length || !timingSafeEqual(sigBuf, expBuf)) {
return { status: 401, body: { error: "Invalid signature" } };
}
} catch { return { status: 401, body: { error: "Invalid signature" } }; }
}
let payload: Record<string, unknown>;
try { payload = JSON.parse(rawBody); } catch { payload = { raw: rawBody }; }
if (endpoint.payload_transform && typeof endpoint.payload_transform === "object") {
payload = resolveTemplate(endpoint.payload_transform, { steps: {}, trigger: payload }) as Record<string, unknown>;
}
// Update stats
await supabase.from("webhook_endpoints").update({
last_received_at: new Date().toISOString(),
total_received: (endpoint.total_received || 0) + 1,
}).eq("id", endpoint.id);
// Audit
await supabase.from("audit_logs").insert({
action: "webhook.received", severity: "info", store_id: endpoint.store_id,
resource_type: "webhook_endpoint", resource_id: endpoint.id, source: "webhook",
details: { slug, workflow_id: endpoint.workflow_id },
}).then(() => {});
// Start workflow
const { data: startResult } = await supabase.rpc("start_workflow_run", {
p_workflow_id: endpoint.workflow_id, p_store_id: endpoint.store_id,
p_trigger_type: "webhook", p_trigger_payload: payload,
});
if (!startResult?.success) {
return { status: 422, body: { error: startResult?.error || "Failed to start workflow" } };
}
const runId = startResult.run_id;
// Sync response — poll until workflow completes or timeout
if (endpoint.sync_response) {
const timeoutMs = (endpoint.sync_timeout_seconds || 30) * 1000;
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
await new Promise(r => setTimeout(r, 500));
const { data: run } = await supabase.from("workflow_runs")
.select("status, step_outputs, error_message").eq("id", runId).single();
if (run?.status === "success") {
return { status: 200, body: { success: true, run_id: runId, output: run.step_outputs } };
}
if (run?.status === "failed") {
return { status: 422, body: { success: false, run_id: runId, error: run.error_message, output: run.step_outputs } };
}
}
return { status: 202, body: { success: true, run_id: runId, status: "running", message: "Workflow still in progress" } };
}
return { status: 200, body: { success: true, run_id: runId, deduplicated: startResult.deduplicated || false } };
}