-- Migration: Workflow Engine v5.4 — Enterprise Features
-- Features: double-texting strategy, per-tenant round-robin, checkpoint/replay, retry policies
-- Date: 2026-02-13
-- ============================================================================
-- 1. DOUBLE-TEXTING / MULTITASK STRATEGY
-- Controls what happens when a new trigger fires while a run is in-flight
-- ============================================================================
ALTER TABLE workflows ADD COLUMN IF NOT EXISTS multitask_strategy TEXT DEFAULT 'allow'
CHECK (multitask_strategy IN ('allow', 'reject', 'enqueue', 'interrupt', 'replace'));
COMMENT ON COLUMN workflows.multitask_strategy IS 'Controls concurrent run behavior: allow (default), reject (block), enqueue (lower priority), interrupt (cancel active), replace (cancel + restart)';
-- ============================================================================
-- 2. PER-TENANT ROUND-ROBIN FAIRNESS in claim_pending_steps
-- Prevents one store from starving others in the worker loop
-- ============================================================================
-- Drop existing RPC (return type may differ)
DROP FUNCTION IF EXISTS claim_pending_steps(integer);
CREATE OR REPLACE FUNCTION claim_pending_steps(batch_size INTEGER DEFAULT 5)
RETURNS TABLE(
step_run_id UUID, run_id UUID, workflow_id UUID, store_id UUID,
step_id UUID, step_key TEXT, step_type TEXT, step_config JSONB,
on_success TEXT, on_failure TEXT, timeout_seconds INTEGER,
input_schema JSONB, step_outputs JSONB, trigger_payload JSONB,
attempt_count INTEGER, max_attempts INTEGER, max_steps_per_run INTEGER,
input JSONB, parent_step_run_id UUID, retry_delay_seconds INTEGER,
concurrency_limit INTEGER, concurrency_key TEXT,
rate_limit INTEGER, rate_window_seconds INTEGER,
priority INTEGER
) AS $$
BEGIN
RETURN QUERY
WITH eligible_steps AS (
-- Select pending + retryable steps with round-robin fairness by store_id
SELECT sr.id AS _sr_id,
sr.run_id AS _run_id,
wr.store_id AS _store_id,
wr.workflow_id AS _wf_id,
-- Rank within each store for round-robin interleaving
ROW_NUMBER() OVER (PARTITION BY wr.store_id ORDER BY COALESCE(wr.priority, 5), sr.created_at) AS rn_within_store
FROM workflow_step_runs sr
JOIN workflow_runs wr ON wr.id = sr.run_id AND wr.status = 'running'
WHERE sr.status = 'pending'
OR (sr.status = 'retrying' AND sr.next_retry_at <= now())
LIMIT batch_size * 3 -- oversample to handle claim failures
),
-- Interleave: order by rank-within-store then store_id for fair distribution
fair_sample AS (
SELECT _sr_id
FROM eligible_steps
ORDER BY rn_within_store, _store_id
LIMIT batch_size
),
claimed AS (
UPDATE workflow_step_runs sr2
SET status = 'running', started_at = now(), attempt_count = sr2.attempt_count + 1
WHERE sr2.id IN (SELECT _sr_id FROM fair_sample)
AND (sr2.status = 'pending' OR (sr2.status = 'retrying' AND sr2.next_retry_at <= now()))
RETURNING sr2.*
)
SELECT
c.id, c.run_id, wr.workflow_id, wr.store_id,
c.step_id, c.step_key, c.step_type,
ws.step_config, ws.on_success, ws.on_failure, COALESCE(ws.timeout_seconds, 60),
ws.input_schema, wr.step_outputs, wr.trigger_payload,
c.attempt_count, c.max_attempts, COALESCE(w.max_steps_per_run, 50),
c.input, c.parent_step_run_id, COALESCE(ws.retry_delay_seconds, 10),
COALESCE(ws.concurrency_limit, 0), COALESCE(ws.concurrency_key, c.step_key),
COALESCE(ws.rate_limit, 0), COALESCE(ws.rate_window_seconds, 60),
COALESCE(wr.priority, 5)
FROM claimed c
JOIN workflow_runs wr ON wr.id = c.run_id
JOIN workflow_steps ws ON ws.id = c.step_id
JOIN workflows w ON w.id = wr.workflow_id;
END;
$$ LANGUAGE plpgsql;
-- ============================================================================
-- 3. CHECKPOINT / REPLAY TABLE
-- Snapshots step_outputs at each successful step for replay-from-checkpoint
-- ============================================================================
CREATE TABLE IF NOT EXISTS workflow_checkpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID NOT NULL REFERENCES workflow_runs(id) ON DELETE CASCADE,
step_run_id UUID NOT NULL REFERENCES workflow_step_runs(id) ON DELETE CASCADE,
step_key TEXT NOT NULL,
step_outputs JSONB NOT NULL DEFAULT '{}'::JSONB,
trigger_payload JSONB NOT NULL DEFAULT '{}'::JSONB,
sequence_number INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
ALTER TABLE workflow_checkpoints ENABLE ROW LEVEL SECURITY;
-- Service role bypass
DO $$ BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_policies WHERE tablename = 'workflow_checkpoints' AND policyname = 'checkpoints_service_role'
) THEN
CREATE POLICY checkpoints_service_role ON workflow_checkpoints FOR ALL
USING (true) WITH CHECK (true);
END IF;
END $$;
-- Indexes
CREATE INDEX IF NOT EXISTS idx_checkpoints_run ON workflow_checkpoints(run_id, sequence_number);
CREATE INDEX IF NOT EXISTS idx_checkpoints_step ON workflow_checkpoints(run_id, step_key);
-- ============================================================================
-- 4. RETRY POLICY COLUMN
-- Allows per-step configurable retry behavior beyond simple max_retries
-- ============================================================================
ALTER TABLE workflow_steps ADD COLUMN IF NOT EXISTS retry_policy JSONB DEFAULT NULL;
COMMENT ON COLUMN workflow_steps.retry_policy IS 'Optional retry policy: {backoff_type: "fixed"|"exponential"|"linear", backoff_base_seconds: N, max_backoff_seconds: N, retry_on: ["pattern1", "pattern2"]}';
-- ============================================================================
-- 5. ADDITIONAL INDEXES for new features
-- ============================================================================
-- Active runs per workflow (for double-texting check)
CREATE INDEX IF NOT EXISTS idx_workflow_runs_active
ON workflow_runs(workflow_id, store_id, status)
WHERE status IN ('running', 'pending');
-- Checkpoints by step (for replay)
CREATE INDEX IF NOT EXISTS idx_checkpoints_step_key
ON workflow_checkpoints(step_key, run_id);
-- ============================================================================
-- 6. UPGRADE step_type CHECK constraint for any new types
-- (Future-proofing: no constraint changes needed for v5.4 since all step types
-- were added in v5.3 migration)
-- ============================================================================
-- Verify the constraint includes all 14 step types
DO $$
DECLARE
v_constraint TEXT;
BEGIN
SELECT pg_get_constraintdef(oid) INTO v_constraint
FROM pg_constraint
WHERE conname = 'workflow_steps_step_type_check'
AND conrelid = 'workflow_steps'::regclass;
-- Only update if constraint exists and is missing types
IF v_constraint IS NOT NULL AND v_constraint NOT LIKE '%waitpoint%' THEN
ALTER TABLE workflow_steps DROP CONSTRAINT IF EXISTS workflow_steps_step_type_check;
ALTER TABLE workflow_steps ADD CONSTRAINT workflow_steps_step_type_check
CHECK (step_type IN ('tool', 'condition', 'transform', 'delay', 'agent', 'sub_workflow',
'webhook_out', 'parallel', 'for_each', 'code', 'noop', 'approval',
'custom', 'waitpoint'));
END IF;
END $$;