// server/handlers/workflows.ts — Workflow CRUD & management
//
// This file is the entry point for the workflow engine. It re-exports the step
// execution engine from ./workflow-steps.ts and contains the handleWorkflows()
// CRUD handler for the MCP tool interface.
//
// Step execution, inline chains, cron/schedule processing, event triggers,
// webhook ingestion, circuit breakers, and all step-type executors live in
// ./workflow-steps.ts.
import { randomUUID } from "node:crypto";
import type { SupabaseClient } from "@supabase/supabase-js";
// Re-export everything from workflow-steps so existing imports from workflows.ts still work
export {
// Injected executor setters
setToolExecutor,
setAgentExecutor,
setTokenBroadcaster,
// Core engine
processWorkflowSteps,
processWaitingSteps,
executeAndAdvance,
executeInlineChain,
// Guest approval
generateGuestApprovalUrl,
verifyGuestApprovalSignature,
// Schedule / timeout / events
processScheduleTriggers,
enforceWorkflowTimeouts,
processEventTriggers,
// Webhook ingestion
handleWebhookIngestion,
// Worker pool management
initWorkerPool,
getPoolStats,
shutdownPool,
// Cron parser (used by CRUD)
getNextCronTime,
// Event journal (used by CRUD replay)
logWorkflowEvent,
// Types
type StepClaim,
type StepResult,
// Run completion (used by CRUD start/cancel)
completeWorkflowRun,
} from "./workflow-steps.js";
import {
executeInlineChain,
getNextCronTime,
completeWorkflowRun,
logWorkflowEvent,
} from "./workflow-steps.js";
// ============================================================================
// CRUD HANDLER — MCP tool interface
// ============================================================================
export async function handleWorkflows(
supabase: SupabaseClient, args: Record<string, unknown>, storeId?: string,
): Promise<{ success: boolean; data?: unknown; error?: string }> {
const action = args.action as string;
const sid = storeId as string;
switch (action) {
case "list": {
let q = supabase.from("workflows")
.select("id, name, description, icon, status, is_active, trigger_type, max_concurrent_runs, version, last_run_at, created_at, circuit_breaker_state")
.eq("store_id", sid).order("created_at", { ascending: false });
if (args.status) q = q.eq("status", args.status as string);
if (args.trigger_type) q = q.eq("trigger_type", args.trigger_type as string);
const { data, error } = await q.limit((args.limit as number) || 50);
return error ? { success: false, error: error.message } : { success: true, data };
}
case "get": {
const { data: wf, error } = await supabase.from("workflows")
.select("*, workflow_steps(*)").eq("id", args.workflow_id as string).eq("store_id", sid).single();
if (error) return { success: false, error: error.message };
const { data: runs } = await supabase.from("workflow_runs")
.select("id, status, trigger_type, started_at, completed_at, duration_ms, error_message, error_step_key")
.eq("workflow_id", args.workflow_id as string).order("created_at", { ascending: false }).limit(10);
return { success: true, data: { ...wf, recent_runs: runs || [] } };
}
case "create": {
// Compute next_run_at from cron expression — check both top-level and trigger_config
const tc = args.trigger_config as Record<string, unknown> | undefined;
const cronExpr = (args.cron_expression as string)
|| (tc?.cron as string) || (tc?.cron_expression as string) || null;
let nextRunAt: string | null = null;
if (cronExpr) {
const next = getNextCronTime(cronExpr);
if (!next) return { success: false, error: `Invalid cron expression: ${cronExpr}` };
nextRunAt = next.toISOString();
}
// Auto-extract timezone from trigger_config if not top-level
const tz = (args.timezone as string) || (tc?.timezone as string) || "UTC";
const { data, error } = await supabase.from("workflows").insert({
store_id: sid,
name: args.name as string,
description: args.description as string || null,
icon: args.icon as string || null,
status: (args.status as string) || "draft",
is_active: args.status === "active",
trigger_type: (args.trigger_type as string) || (cronExpr ? "schedule" : "manual"),
trigger_config: args.trigger_config || {},
max_concurrent_runs: (args.max_concurrent_runs as number) || 1,
max_run_duration_seconds: (args.max_run_duration_seconds as number) || 3600,
max_steps_per_run: (args.max_steps_per_run as number) || 50,
max_retries_per_step: (args.max_retries_per_step as number) || 3,
on_error_webhook_url: args.on_error_webhook_url as string || null,
on_error_email: args.on_error_email as string || null,
cron_expression: cronExpr,
next_run_at: nextRunAt,
timezone: tz,
multitask_strategy: (args.multitask_strategy as string) || "allow",
}).select().single();
if (error) return { success: false, error: error.message };
if (Array.isArray(args.steps)) {
const steps = (args.steps as Array<Record<string, unknown>>).map((s, i) => ({
workflow_id: data.id,
step_key: s.step_key as string,
step_type: s.step_type as string,
is_entry_point: s.is_entry_point ?? (i === 0),
on_success: s.on_success as string || null,
on_failure: s.on_failure as string || null,
step_config: s.step_config || {},
max_retries: (s.max_retries as number) || 3,
retry_delay_seconds: (s.retry_delay_seconds as number) || 10,
timeout_seconds: (s.timeout_seconds as number) || 60,
input_schema: s.input_schema || null,
position_x: (s.position_x as number) || 0,
position_y: (s.position_y as number) || i * 100,
}));
const { error: stepsErr } = await supabase.from("workflow_steps").insert(steps);
if (stepsErr) return { success: false, error: `Workflow created but steps failed: ${stepsErr.message}` };
}
return { success: true, data };
}
case "update": {
const updates: Record<string, unknown> = {};
const allowed = ["name", "description", "icon", "status", "trigger_type", "trigger_config",
"max_concurrent_runs", "max_run_duration_seconds", "max_steps_per_run", "max_retries_per_step",
"on_error_webhook_url", "on_error_email", "multitask_strategy", "timezone"];
for (const k of allowed) { if (args[k] !== undefined) updates[k] = args[k]; }
if (args.status !== undefined) updates.is_active = args.status === "active";
// Handle cron_expression update — check both top-level and trigger_config
const utc = args.trigger_config as Record<string, unknown> | undefined;
const cronFromConfig = (utc?.cron as string) || (utc?.cron_expression as string) || null;
const cronExplicit = args.cron_expression !== undefined ? (args.cron_expression as string | null) : null;
const cronExpr = cronExplicit || cronFromConfig;
if (cronExpr !== null && (args.cron_expression !== undefined || cronFromConfig)) {
updates.cron_expression = cronExpr;
const next = getNextCronTime(cronExpr);
if (!next) return { success: false, error: `Invalid cron expression: ${cronExpr}` };
updates.next_run_at = next.toISOString();
// Auto-set trigger_type to schedule if cron is provided
if (!updates.trigger_type) updates.trigger_type = "schedule";
// Extract timezone from trigger_config if not top-level
if (!updates.timezone && utc?.timezone) updates.timezone = utc.timezone;
} else if (args.cron_expression === null) {
// Explicitly clearing cron
updates.cron_expression = null;
updates.next_run_at = null;
}
const { data, error } = await supabase.from("workflows")
.update(updates).eq("id", args.workflow_id as string).eq("store_id", sid).select().maybeSingle();
if (error) return { success: false, error: error.message };
if (!data) return { success: false, error: "Workflow not found or store mismatch" };
return { success: true, data };
}
case "delete": {
const { error } = await supabase.from("workflows").delete()
.eq("id", args.workflow_id as string).eq("store_id", sid);
return error ? { success: false, error: error.message } : { success: true, data: { deleted: true } };
}
case "add_step": {
// H9 FIX: Verify workflow belongs to this store before adding step
const { data: wfCheck } = await supabase.from("workflows")
.select("id").eq("id", args.workflow_id as string).eq("store_id", sid).single();
if (!wfCheck) return { success: false, error: "Workflow not found in this store" };
const { data, error } = await supabase.from("workflow_steps").insert({
workflow_id: args.workflow_id as string,
step_key: args.step_key as string, step_type: args.step_type as string,
is_entry_point: args.is_entry_point ?? false,
on_success: args.on_success as string || null, on_failure: args.on_failure as string || null,
step_config: args.step_config || {},
max_retries: (args.max_retries as number) || 3,
timeout_seconds: (args.timeout_seconds as number) || 60,
input_schema: args.input_schema || null,
}).select().single();
return error ? { success: false, error: error.message } : { success: true, data };
}
case "update_step": {
// H9 FIX: Verify step belongs to a workflow owned by this store
const { data: stepCheck } = await supabase.from("workflow_steps")
.select("id, workflow_id, workflows!inner(store_id)")
.eq("id", args.step_id as string).single();
if (!stepCheck || (stepCheck as any).workflows?.store_id !== sid) {
return { success: false, error: "Step not found in this store's workflows" };
}
const su: Record<string, unknown> = {};
for (const k of ["step_key", "step_type", "is_entry_point", "on_success", "on_failure",
"step_config", "max_retries", "retry_delay_seconds", "timeout_seconds", "input_schema",
"position_x", "position_y"]) {
if (args[k] !== undefined) {
// Treat empty string as null for nullable fields (on_success, on_failure)
su[k] = (args[k] === "" && (k === "on_success" || k === "on_failure")) ? null : args[k];
}
}
const { data, error } = await supabase.from("workflow_steps")
.update(su).eq("id", args.step_id as string).select().single();
return error ? { success: false, error: error.message } : { success: true, data };
}
case "delete_step": {
// H9 FIX: Verify step belongs to a workflow owned by this store
const { data: stepCheck } = await supabase.from("workflow_steps")
.select("id, workflow_id, workflows!inner(store_id)")
.eq("id", args.step_id as string).single();
if (!stepCheck || (stepCheck as any).workflows?.store_id !== sid) {
return { success: false, error: "Step not found in this store's workflows" };
}
const { error } = await supabase.from("workflow_steps").delete().eq("id", args.step_id as string);
return error ? { success: false, error: error.message } : { success: true, data: { deleted: true } };
}
case "start": {
const wfId = args.workflow_id as string;
// Auto-activate workflow if needed (start implies intent to run)
await supabase.from("workflows").update({ is_active: true, status: "active" })
.eq("id", wfId).eq("store_id", sid).in("status", ["draft", "paused"]);
// Load workflow config for strategy + concurrency + versioning
const { data: wfConfig } = await supabase.from("workflows")
.select("multitask_strategy, published_version_id, max_concurrent_runs").eq("id", wfId).single();
const strategy = (wfConfig?.multitask_strategy as string) || "allow";
// Workflow-level concurrency check
const maxConcurrent = (wfConfig?.max_concurrent_runs as number) || 0;
if (maxConcurrent > 0) {
const { count: activeCount } = await supabase.from("workflow_runs")
.select("id", { count: "exact", head: true })
.eq("workflow_id", wfId).eq("store_id", sid)
.in("status", ["running", "pending"]);
if ((activeCount || 0) >= maxConcurrent) {
return { success: false, error: `Workflow concurrency limit reached (${activeCount}/${maxConcurrent} active runs)`, data: { concurrent_runs: activeCount, max_concurrent_runs: maxConcurrent } };
}
}
if (strategy !== "allow") {
// Check for in-flight runs
const { data: activeRuns } = await supabase.from("workflow_runs")
.select("id, status, created_at")
.eq("workflow_id", wfId).eq("store_id", sid)
.in("status", ["running", "pending"])
.order("created_at", { ascending: false })
.limit(10);
if (activeRuns?.length) {
switch (strategy) {
case "reject":
return { success: false, error: `Workflow already has ${activeRuns.length} active run(s). Strategy: reject concurrent runs.`, data: { strategy: "reject", active_runs: activeRuns.length } };
case "enqueue":
// Let it through — the run will be created and the worker picks it up in order
// Set priority lower so existing runs finish first
break;
case "interrupt":
// Cancel all active runs before starting new one
for (const run of activeRuns) {
await completeWorkflowRun(supabase, run.id, wfId, sid, "cancelled", "Interrupted by new run (multitask_strategy: interrupt)");
}
break;
case "replace":
// Cancel all active runs AND delete their step runs
for (const run of activeRuns) {
await completeWorkflowRun(supabase, run.id, wfId, sid, "cancelled", "Replaced by new run (multitask_strategy: replace)");
}
break;
}
}
}
const { data, error } = await supabase.rpc("start_workflow_run", {
p_workflow_id: wfId, p_store_id: sid,
p_trigger_type: (args.trigger_type as string) || "manual",
p_trigger_payload: args.trigger_payload || {},
p_idempotency_key: args.idempotency_key as string || null,
});
if (error) return { success: false, error: error.message };
if (!data?.success) return { success: false, error: data?.error || "Failed" };
// Generate trace_id for distributed tracing
const traceId = data.run_id ? randomUUID() : undefined;
// Set version_id, trace_id, priority
if (data.run_id && !data.deduplicated) {
const runUpdates: Record<string, unknown> = { trace_id: traceId };
if (wfConfig?.published_version_id) runUpdates.version_id = wfConfig.published_version_id;
if (strategy === "enqueue") runUpdates.priority = 3;
await supabase.from("workflow_runs").update(runUpdates).eq("id", data.run_id);
// Phase 1: Inline execution — execute first step immediately
try {
await executeInlineChain(supabase, data.run_id);
} catch (err) {
console.error("[workflow-inline] Error in inline chain:", (err as Error).message);
// Non-fatal — worker will pick up remaining steps
}
}
return { success: true, data: { ...data, strategy, trace_id: traceId } };
}
case "pause": {
const { error } = await supabase.from("workflow_runs").update({ status: "paused" })
.eq("id", args.run_id as string).eq("store_id", sid).eq("status", "running");
return error ? { success: false, error: error.message } : { success: true, data: { paused: true } };
}
case "resume": {
const { error } = await supabase.from("workflow_runs").update({ status: "running" })
.eq("id", args.run_id as string).eq("store_id", sid).eq("status", "paused");
return error ? { success: false, error: error.message } : { success: true, data: { resumed: true } };
}
case "cancel": {
const { data: run } = await supabase.from("workflow_runs")
.select("workflow_id, store_id").eq("id", args.run_id as string).eq("store_id", sid).single();
if (!run) return { success: false, error: "Run not found or access denied" };
await completeWorkflowRun(supabase, args.run_id as string, run.workflow_id, run.store_id, "cancelled", "Cancelled by user");
return { success: true, data: { cancelled: true } };
}
case "runs": {
let q = supabase.from("workflow_runs")
.select("id, workflow_id, status, trigger_type, trigger_payload, current_step_key, error_message, error_step_key, started_at, completed_at, duration_ms, created_at")
.eq("store_id", sid).order("created_at", { ascending: false });
if (args.workflow_id) q = q.eq("workflow_id", args.workflow_id as string);
if (args.status) q = q.eq("status", args.status as string);
const { data, error } = await q.limit((args.limit as number) || 25);
return error ? { success: false, error: error.message } : { success: true, data };
}
case "step_runs": {
const { data, error } = await supabase.from("workflow_step_runs")
.select("id, step_key, step_type, status, input, output, error_message, attempt_count, started_at, completed_at, duration_ms, parent_step_run_id, child_run_id")
.eq("run_id", args.run_id as string).order("created_at", { ascending: true });
return error ? { success: false, error: error.message } : { success: true, data };
}
case "analytics": {
const { data, error } = await supabase.rpc("get_workflow_analytics", {
p_store_id: sid, p_days: (args.days as number) || 30,
});
return error ? { success: false, error: error.message } : { success: true, data };
}
case "create_webhook": {
// P1 FIX: Ensure slug is globally unique to prevent cross-store webhook interception.
// handleWebhookIngestion queries by slug without store_id, so duplicate slugs across
// stores would cause one store to intercept another's webhooks.
const { data: existingSlug } = await supabase
.from("webhook_endpoints")
.select("id")
.eq("slug", args.slug as string)
.eq("is_active", true)
.limit(1);
if (existingSlug?.length) {
return { success: false, error: "Webhook slug already in use. Choose a different slug." };
}
const { data, error } = await supabase.from("webhook_endpoints").insert({
store_id: sid, name: args.name as string, description: args.description as string || null,
slug: args.slug as string, workflow_id: args.workflow_id as string,
verify_signature: args.verify_signature ?? true,
max_requests_per_minute: (args.max_requests_per_minute as number) || 60,
payload_transform: args.payload_transform || null,
sync_response: args.sync_response ?? false,
sync_timeout_seconds: (args.sync_timeout_seconds as number) || 30,
}).select().single();
if (error) return { success: false, error: error.message };
return { success: true, data: { ...data, webhook_url: `https://whale-agent.fly.dev/webhooks/${data.slug}` } };
}
case "list_webhooks": {
const { data, error } = await supabase.from("webhook_endpoints")
.select("id, name, description, slug, workflow_id, is_active, verify_signature, sync_response, last_received_at, total_received, created_at")
.eq("store_id", sid).order("created_at", { ascending: false });
return error ? { success: false, error: error.message } : { success: true, data };
}
case "delete_webhook": {
const { error } = await supabase.from("webhook_endpoints").delete()
.eq("id", args.webhook_id as string).eq("store_id", sid);
return error ? { success: false, error: error.message } : { success: true, data: { deleted: true } };
}
// ================================================================
// PHASE 2: Approval actions
// ================================================================
case "list_approvals": {
let q = supabase.from("workflow_approval_requests")
.select("id, run_id, step_run_id, workflow_id, title, description, prompt, options, form_schema, assigned_to, assigned_role, status, response_data, responded_by, responded_at, expires_at, timeout_action, created_at")
.eq("store_id", sid).order("created_at", { ascending: false });
if (args.status) q = q.eq("status", args.status as string);
if (args.workflow_id) q = q.eq("workflow_id", args.workflow_id as string);
if (args.run_id) q = q.eq("run_id", args.run_id as string);
const { data, error } = await q.limit((args.limit as number) || 25);
return error ? { success: false, error: error.message } : { success: true, data };
}
case "respond_approval": {
if (!args.approval_id) return { success: false, error: "approval_id required" };
if (!args.response_status) return { success: false, error: "response_status required (approved/rejected)" };
const { data, error } = await supabase.rpc("respond_to_approval", {
p_approval_id: args.approval_id as string,
p_store_id: sid,
p_status: args.response_status as string,
p_response_data: args.response_data || {},
p_responded_by: args.responded_by || null,
});
if (error) return { success: false, error: error.message };
return data?.success ? { success: true, data } : { success: false, error: data?.error || "Failed" };
}
// ================================================================
// PHASE 4: Versioning actions
// ================================================================
case "publish": {
if (!args.workflow_id) return { success: false, error: "workflow_id required" };
const { data, error } = await supabase.rpc("publish_workflow_version", {
p_workflow_id: args.workflow_id as string,
p_store_id: sid,
p_changelog: args.changelog as string || null,
p_published_by: args.published_by || null,
});
if (error) return { success: false, error: error.message };
if (!data?.success) return { success: false, error: data?.error || "Failed" };
// Auto-activate on publish — publishing implies ready to run
await supabase.from("workflows").update({ is_active: true, status: "active" })
.eq("id", args.workflow_id as string).eq("store_id", sid);
return { success: true, data };
}
case "versions": {
if (!args.workflow_id) return { success: false, error: "workflow_id required" };
const { data, error } = await supabase.from("workflow_versions")
.select("id, version, name, description, trigger_type, published_by, published_at, changelog")
.eq("workflow_id", args.workflow_id as string)
.order("version", { ascending: false })
.limit((args.limit as number) || 25);
return error ? { success: false, error: error.message } : { success: true, data };
}
case "rollback": {
if (!args.workflow_id || !args.version_id) return { success: false, error: "workflow_id and version_id required" };
// Verify version belongs to this workflow
const { data: version, error: verErr } = await supabase.from("workflow_versions")
.select("id, version").eq("id", args.version_id as string)
.eq("workflow_id", args.workflow_id as string).single();
if (verErr || !version) return { success: false, error: "Version not found for this workflow" };
const { error } = await supabase.from("workflows").update({
published_version_id: version.id,
}).eq("id", args.workflow_id as string).eq("store_id", sid);
return error ? { success: false, error: error.message } : { success: true, data: { rolled_back_to: version.version, version_id: version.id } };
}
// ================================================================
// PHASE 5: Template actions
// ================================================================
case "list_templates": {
let q = supabase.from("workflows")
.select("id, name, description, icon, trigger_type, template_category, template_tags, clone_count, created_at")
.eq("is_template", true).order("clone_count", { ascending: false });
if (args.category) q = q.eq("template_category", args.category as string);
const { data, error } = await q.limit((args.limit as number) || 50);
return error ? { success: false, error: error.message } : { success: true, data };
}
case "clone_template": {
if (!args.template_id) return { success: false, error: "template_id required" };
const { data, error } = await supabase.rpc("clone_workflow_template", {
p_template_id: args.template_id as string,
p_store_id: sid,
p_name: args.name as string || null,
});
if (error) return { success: false, error: error.message };
return data?.success ? { success: true, data } : { success: false, error: data?.error || "Failed" };
}
// ================================================================
// Checkpoint / replay — time-travel debugging
// ================================================================
case "checkpoints": {
if (!args.run_id) return { success: false, error: "run_id required" };
const { data, error } = await supabase.from("workflow_checkpoints")
.select("id, step_key, step_run_id, sequence_number, created_at")
.eq("run_id", args.run_id as string)
.order("sequence_number", { ascending: true });
return error ? { success: false, error: error.message } : { success: true, data };
}
case "replay": {
if (!args.run_id) return { success: false, error: "run_id required" };
const fromStepKey = args.from_step as string;
// Load the checkpoint to replay from
let checkpointQuery = supabase.from("workflow_checkpoints")
.select("*").eq("run_id", args.run_id as string);
if (fromStepKey) {
checkpointQuery = checkpointQuery.eq("step_key", fromStepKey);
} else {
// Default: replay from last successful checkpoint
checkpointQuery = checkpointQuery.order("sequence_number", { ascending: false }).limit(1);
}
const { data: checkpoint } = await checkpointQuery.single();
if (!checkpoint) return { success: false, error: "No checkpoint found" };
// Get original run's workflow
const { data: origRun } = await supabase.from("workflow_runs")
.select("workflow_id, trigger_type, trigger_payload, version_id")
.eq("id", args.run_id as string).single();
if (!origRun) return { success: false, error: "Original run not found" };
// Start a new run with checkpoint state pre-loaded
const { data: newRun, error: startErr } = await supabase.rpc("start_workflow_run", {
p_workflow_id: origRun.workflow_id,
p_store_id: sid,
p_trigger_type: "replay",
p_trigger_payload: checkpoint.trigger_payload || origRun.trigger_payload || {},
p_idempotency_key: null,
});
if (startErr || !newRun?.success) return { success: false, error: startErr?.message || "Failed to start replay run" };
// Pre-load step_outputs from checkpoint
await supabase.from("workflow_runs").update({
step_outputs: checkpoint.step_outputs,
version_id: origRun.version_id,
}).eq("id", newRun.run_id);
// Cancel the auto-created entry step runs (we'll create our own starting from the checkpoint step)
await supabase.from("workflow_step_runs").update({ status: "cancelled" })
.eq("run_id", newRun.run_id).eq("status", "pending");
// Find what step comes AFTER the checkpoint step
const { data: checkpointStepDef } = await supabase.from("workflow_steps")
.select("on_success").eq("workflow_id", origRun.workflow_id)
.eq("step_key", checkpoint.step_key).single();
if (checkpointStepDef?.on_success) {
// createNextStepRunByKey is internal to workflow-steps — use direct insert
const { data: nextStepDef } = await supabase.from("workflow_steps")
.select("id, step_key, step_type, max_retries")
.eq("workflow_id", origRun.workflow_id).eq("step_key", checkpointStepDef.on_success).single();
if (nextStepDef) {
await supabase.from("workflow_step_runs").insert({
run_id: newRun.run_id, step_id: nextStepDef.id, step_key: nextStepDef.step_key,
step_type: nextStepDef.step_type, status: "pending", max_attempts: nextStepDef.max_retries ?? 3,
});
}
try { await executeInlineChain(supabase, newRun.run_id); } catch (err) { console.error("[workflow] Inline chain failed for replay run", newRun.run_id, ":", (err as Error).message); }
}
logWorkflowEvent(supabase, newRun.run_id, "run_replayed", {
original_run_id: args.run_id, from_step: checkpoint.step_key, checkpoint_id: checkpoint.id,
});
return { success: true, data: { run_id: newRun.run_id, replayed_from: checkpoint.step_key, original_run_id: args.run_id } };
}
// ================================================================
// Event journal — time-travel debugging
// ================================================================
case "events": {
if (!args.run_id) return { success: false, error: "run_id required" };
const { data, error } = await supabase.from("workflow_events")
.select("id, event_type, step_run_id, payload, created_at")
.eq("run_id", args.run_id as string)
.order("created_at", { ascending: true })
.limit((args.limit as number) || 200);
return error ? { success: false, error: error.message } : { success: true, data };
}
// ================================================================
// Waitpoint actions
// ================================================================
case "list_waitpoints": {
let q = supabase.from("waitpoint_tokens")
.select("id, token, run_id, step_run_id, label, status, expires_at, completed_at, created_at")
.eq("store_id", sid).order("created_at", { ascending: false });
if (args.run_id) q = q.eq("run_id", args.run_id as string);
if (args.status) q = q.eq("status", args.status as string);
const { data, error } = await q.limit((args.limit as number) || 25);
return error ? { success: false, error: error.message } : { success: true, data };
}
case "complete_waitpoint": {
if (!args.token) return { success: false, error: "token required" };
// Find the waitpoint
const { data: wp, error: wpErr } = await supabase.from("waitpoint_tokens")
.select("id, run_id, step_run_id, store_id, expires_at, status")
.eq("token", args.token as string).eq("store_id", sid).single();
if (wpErr || !wp) return { success: false, error: "Waitpoint token not found" };
if (wp.status === "completed") return { success: false, error: "Waitpoint already completed" };
if (new Date(wp.expires_at) < new Date()) return { success: false, error: "Waitpoint expired" };
// Mark completed
await supabase.from("waitpoint_tokens").update({
status: "completed", completion_data: args.data || {}, completed_at: new Date().toISOString(),
}).eq("id", wp.id);
// Resume the waiting step
await supabase.from("workflow_step_runs").update({
status: "pending", input: { waitpoint_completed: true, waitpoint_data: args.data || {} },
}).eq("id", wp.step_run_id).eq("status", "waiting");
// Inline resume
try { await executeInlineChain(supabase, wp.run_id); } catch (err) { console.error("[workflow] Inline chain failed after waitpoint:", (err as Error).message); }
return { success: true, data: { completed: true, run_id: wp.run_id } };
}
// ================================================================
// Dead Letter Queue actions
// ================================================================
case "dlq": {
let q = supabase.from("workflow_dlq")
.select("id, workflow_id, workflow_name, run_id, error_message, error_step_key, trigger_type, status, run_duration_ms, created_at")
.eq("store_id", sid).order("created_at", { ascending: false });
if (args.status) q = q.eq("status", args.status as string);
if (args.workflow_id) q = q.eq("workflow_id", args.workflow_id as string);
const { data, error } = await q.limit((args.limit as number) || 25);
return error ? { success: false, error: error.message } : { success: true, data };
}
case "dlq_retry": {
if (!args.dlq_id) return { success: false, error: "dlq_id required" };
const { data: dlqEntry } = await supabase.from("workflow_dlq")
.select("*").eq("id", args.dlq_id as string).eq("store_id", sid).single();
if (!dlqEntry) return { success: false, error: "DLQ entry not found" };
if (dlqEntry.status !== "pending") return { success: false, error: `DLQ entry already ${dlqEntry.status}` };
// Start a fresh run of the same workflow with the same trigger
const { data: result, error: startErr } = await supabase.rpc("start_workflow_run", {
p_workflow_id: dlqEntry.workflow_id,
p_store_id: sid,
p_trigger_type: dlqEntry.trigger_type || "dlq_retry",
p_trigger_payload: { ...(dlqEntry.trigger_payload || {}), dlq_retry: true, original_run_id: dlqEntry.run_id },
p_idempotency_key: null,
});
if (startErr || !result?.success) return { success: false, error: startErr?.message || result?.error || "Retry failed" };
// Update DLQ entry
await supabase.from("workflow_dlq").update({
status: "retried", retried_run_id: result.run_id,
attempt_count: (dlqEntry.attempt_count || 1) + 1,
}).eq("id", dlqEntry.id);
// Inline execution for the retry
try { await executeInlineChain(supabase, result.run_id); } catch (err) { console.error("[workflow] Inline chain failed for DLQ retry run", result.run_id, ":", (err as Error).message); }
return { success: true, data: { retried: true, new_run_id: result.run_id, dlq_id: dlqEntry.id } };
}
case "dlq_dismiss": {
if (!args.dlq_id) return { success: false, error: "dlq_id required" };
const { error } = await supabase.from("workflow_dlq").update({
status: "dismissed",
dismissed_by: (args.dismissed_by as string) || null,
dismissed_at: new Date().toISOString(),
notes: (args.notes as string) || null,
}).eq("id", args.dlq_id as string).eq("store_id", sid);
return error ? { success: false, error: error.message } : { success: true, data: { dismissed: true } };
}
// ================================================================
// Enhanced metrics + DAG visualization
// ================================================================
case "metrics": {
const { data, error } = await supabase.rpc("get_workflow_metrics", {
p_store_id: sid, p_days: (args.days as number) || 30,
});
return error ? { success: false, error: error.message } : { success: true, data };
}
case "graph": {
// Return DAG visualization data (nodes + edges) for a workflow
if (!args.workflow_id) return { success: false, error: "workflow_id required" };
const { data: steps, error: stepsErr } = await supabase.from("workflow_steps")
.select("id, step_key, step_type, is_entry_point, on_success, on_failure, position_x, position_y, step_config, timeout_seconds, max_retries")
.eq("workflow_id", args.workflow_id as string)
.order("position_y", { ascending: true });
if (stepsErr) return { success: false, error: stepsErr.message };
const nodes = (steps || []).map(s => ({
id: s.step_key,
step_id: s.id, // UUID for update_step/delete_step
type: s.step_type,
label: s.step_key,
is_entry_point: s.is_entry_point,
on_success: s.on_success || null,
on_failure: s.on_failure || null,
max_retries: s.max_retries,
timeout_seconds: s.timeout_seconds,
step_config: s.step_config || {},
position: { x: s.position_x || 0, y: s.position_y || 0 },
config_summary: {
timeout: s.timeout_seconds,
max_retries: s.max_retries,
...(s.step_type === "condition" ? { expression: (s.step_config as any)?.expression } : {}),
...(s.step_type === "tool" ? { tool_name: (s.step_config as any)?.tool_name } : {}),
...(s.step_type === "delay" ? { seconds: (s.step_config as any)?.seconds } : {}),
},
}));
const edges: Array<{ from: string; to: string; type: string }> = [];
for (const s of steps || []) {
if (s.on_success) edges.push({ from: s.step_key, to: s.on_success, type: "success" });
if (s.on_failure) edges.push({ from: s.step_key, to: s.on_failure, type: "failure" });
// Handle condition branches
const cfg = s.step_config as any;
if (s.step_type === "condition") {
if (cfg?.on_true) edges.push({ from: s.step_key, to: cfg.on_true, type: "true" });
if (cfg?.on_false) edges.push({ from: s.step_key, to: cfg.on_false, type: "false" });
}
// Handle parallel children
const parallelKeys = cfg?.step_keys || cfg?.child_steps;
if (s.step_type === "parallel" && Array.isArray(parallelKeys)) {
for (const childKey of parallelKeys) {
edges.push({ from: s.step_key, to: childKey, type: "parallel" });
}
}
}
// If run_id provided, overlay live status on nodes
let nodeStatus: Record<string, { status: string; duration_ms: number | null; error?: string }> = {};
if (args.run_id) {
const { data: stepRuns } = await supabase.from("workflow_step_runs")
.select("step_key, status, duration_ms, error_message")
.eq("run_id", args.run_id as string);
for (const sr of stepRuns || []) {
nodeStatus[sr.step_key] = { status: sr.status, duration_ms: sr.duration_ms, ...(sr.error_message ? { error: sr.error_message } : {}) };
}
}
return { success: true, data: { nodes, edges, node_status: Object.keys(nodeStatus).length ? nodeStatus : undefined } };
}
// ================================================================
// Schedule management
// ================================================================
case "set_schedule": {
if (!args.workflow_id) return { success: false, error: "workflow_id required" };
const cronExpr = args.cron_expression as string | null;
const runAt = args.run_at as string | null;
if (runAt) {
// One-time schedule: run once at a specific datetime
const runAtDate = new Date(runAt);
if (isNaN(runAtDate.getTime())) return { success: false, error: `Invalid run_at datetime: ${runAt}` };
if (runAtDate <= new Date()) return { success: false, error: "run_at must be in the future" };
const { error } = await supabase.from("workflows").update({
cron_expression: null,
next_run_at: runAtDate.toISOString(),
trigger_type: "schedule",
timezone: (args.timezone as string) || "UTC",
is_active: true,
status: "active",
}).eq("id", args.workflow_id as string).eq("store_id", sid);
return error ? { success: false, error: error.message } : { success: true, data: { one_time: true, run_at: runAtDate.toISOString() } };
} else if (cronExpr) {
// Recurring cron schedule
const next = getNextCronTime(cronExpr);
if (!next) return { success: false, error: `Invalid cron expression: ${cronExpr}` };
const { error } = await supabase.from("workflows").update({
cron_expression: cronExpr,
next_run_at: next.toISOString(),
trigger_type: "schedule",
timezone: (args.timezone as string) || "UTC",
}).eq("id", args.workflow_id as string).eq("store_id", sid);
return error ? { success: false, error: error.message } : { success: true, data: { cron: cronExpr, next_run_at: next.toISOString() } };
} else {
// Clear schedule
const { error } = await supabase.from("workflows").update({
cron_expression: null, next_run_at: null,
}).eq("id", args.workflow_id as string).eq("store_id", sid);
return error ? { success: false, error: error.message } : { success: true, data: { schedule_cleared: true } };
}
}
// ================================================================
// Event bus — fire events & manage subscriptions
// ================================================================
case "fire_event": {
if (!args.event_type) return { success: false, error: "event_type required" };
const { data: evtId, error: evtErr } = await supabase.rpc("fire_event", {
p_store_id: sid,
p_event_type: args.event_type as string,
p_event_payload: (args.event_payload as Record<string, unknown>) || {},
p_source: (args.source as string) || "workflow_tool",
});
return evtErr ? { success: false, error: evtErr.message } : { success: true, data: { event_id: evtId } };
}
case "list_subscriptions": {
let q = supabase.from("workflow_event_subscriptions")
.select("id, store_id, workflow_id, event_type, filter_expression, is_active, created_at")
.eq("store_id", sid).order("created_at", { ascending: false });
if (args.event_type) q = q.eq("event_type", args.event_type as string);
if (args.workflow_id) q = q.eq("workflow_id", args.workflow_id as string);
const { data, error } = await q.limit((args.limit as number) || 50);
return error ? { success: false, error: error.message } : { success: true, data };
}
case "create_subscription": {
if (!args.workflow_id) return { success: false, error: "workflow_id required" };
if (!args.event_type) return { success: false, error: "event_type required" };
const { data: sub, error: subErr } = await supabase.from("workflow_event_subscriptions")
.insert({
store_id: sid,
workflow_id: args.workflow_id as string,
event_type: args.event_type as string,
filter_expression: (args.filter_expression as string) || null,
is_active: args.is_active !== false,
})
.select().single();
return subErr ? { success: false, error: subErr.message } : { success: true, data: sub };
}
case "delete_subscription": {
if (!args.subscription_id) return { success: false, error: "subscription_id required" };
const { error: delErr } = await supabase.from("workflow_event_subscriptions")
.delete().eq("id", args.subscription_id as string).eq("store_id", sid);
return delErr ? { success: false, error: delErr.message } : { success: true, data: { deleted: true } };
}
case "list_events": {
let q = supabase.from("automation_events")
.select("id, store_id, event_type, event_payload, source, status, processed_at, error_message, created_at")
.eq("store_id", sid).order("created_at", { ascending: false });
if (args.event_type) q = q.eq("event_type", args.event_type as string);
if (args.status) q = q.eq("status", args.status as string);
const { data, error } = await q.limit((args.limit as number) || 50);
return error ? { success: false, error: error.message } : { success: true, data };
}
default:
return { success: false, error: `Unknown workflow action: ${action}` };
}
}