-- Migration: Workflow Engine v5.5 — Enterprise Completeness
-- Features: cron/schedule triggers, workflow timeout enforcement, DLQ,
-- workflow-level concurrency, priority enforcement, trace IDs, DAG export
-- Date: 2026-02-14
-- ============================================================================
-- 1. CRON / SCHEDULE TRIGGERS
-- Stores cron expression + next_run_at for the scheduler worker to pick up
-- ============================================================================
ALTER TABLE workflows ADD COLUMN IF NOT EXISTS cron_expression TEXT DEFAULT NULL;
ALTER TABLE workflows ADD COLUMN IF NOT EXISTS next_run_at TIMESTAMPTZ DEFAULT NULL;
ALTER TABLE workflows ADD COLUMN IF NOT EXISTS timezone TEXT DEFAULT 'UTC';
ALTER TABLE workflows ADD COLUMN IF NOT EXISTS last_scheduled_at TIMESTAMPTZ DEFAULT NULL;
COMMENT ON COLUMN workflows.cron_expression IS 'Cron expression (5-field: min hour dom mon dow). NULL = not scheduled.';
COMMENT ON COLUMN workflows.next_run_at IS 'Next scheduled run time. Updated after each schedule trigger.';
COMMENT ON COLUMN workflows.timezone IS 'IANA timezone for cron evaluation (default: UTC)';
COMMENT ON COLUMN workflows.last_scheduled_at IS 'Last time this workflow was triggered by the scheduler';
-- Index for scheduler: find workflows whose next_run_at has passed
CREATE INDEX IF NOT EXISTS idx_workflows_schedule
ON workflows(next_run_at)
WHERE next_run_at IS NOT NULL AND is_active = true AND status = 'active';
-- ============================================================================
-- 2. DEAD LETTER QUEUE (DLQ)
-- Archives permanently failed workflow runs for investigation + replay
-- ============================================================================
CREATE TABLE IF NOT EXISTS workflow_dlq (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
store_id UUID NOT NULL,
run_id UUID NOT NULL REFERENCES workflow_runs(id) ON DELETE CASCADE,
workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
workflow_name TEXT,
error_message TEXT,
error_step_key TEXT,
trigger_type TEXT,
trigger_payload JSONB DEFAULT '{}'::JSONB,
step_outputs JSONB DEFAULT '{}'::JSONB,
run_duration_ms INTEGER,
attempt_count INTEGER DEFAULT 1,
-- DLQ lifecycle
status TEXT DEFAULT 'pending' CHECK (status IN ('pending', 'retried', 'dismissed', 'escalated')),
retried_run_id UUID,
dismissed_by TEXT,
dismissed_at TIMESTAMPTZ,
notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
ALTER TABLE workflow_dlq ENABLE ROW LEVEL SECURITY;
DO $$ BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_policies WHERE tablename = 'workflow_dlq' AND policyname = 'dlq_service_role'
) THEN
CREATE POLICY dlq_service_role ON workflow_dlq FOR ALL
USING (true) WITH CHECK (true);
END IF;
END $$;
CREATE INDEX IF NOT EXISTS idx_dlq_store ON workflow_dlq(store_id, status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_dlq_workflow ON workflow_dlq(workflow_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_dlq_run ON workflow_dlq(run_id);
-- ============================================================================
-- 3. TRACE IDS — for structured logging / distributed tracing
-- ============================================================================
ALTER TABLE workflow_runs ADD COLUMN IF NOT EXISTS trace_id TEXT DEFAULT NULL;
ALTER TABLE workflow_step_runs ADD COLUMN IF NOT EXISTS trace_id TEXT DEFAULT NULL;
COMMENT ON COLUMN workflow_runs.trace_id IS 'Distributed trace ID (UUID) for correlating logs across steps and services';
COMMENT ON COLUMN workflow_step_runs.trace_id IS 'Inherited trace ID from parent run for step-level correlation';
CREATE INDEX IF NOT EXISTS idx_runs_trace ON workflow_runs(trace_id) WHERE trace_id IS NOT NULL;
-- ============================================================================
-- 4. WORKFLOW-LEVEL CONCURRENCY ENFORCEMENT
-- max_concurrent_runs already exists on workflows table.
-- Add index for fast count of active runs per workflow.
-- ============================================================================
CREATE INDEX IF NOT EXISTS idx_runs_workflow_active
ON workflow_runs(workflow_id, store_id)
WHERE status IN ('running', 'pending');
-- ============================================================================
-- 5. ENHANCED WORKFLOW ANALYTICS RPC
-- Replaces basic get_workflow_analytics with richer metrics
-- ============================================================================
-- Drop if return type changed
DROP FUNCTION IF EXISTS get_workflow_metrics(UUID, INTEGER);
CREATE OR REPLACE FUNCTION get_workflow_metrics(p_store_id UUID, p_days INTEGER DEFAULT 30)
RETURNS JSONB AS $$
DECLARE
v_result JSONB;
BEGIN
WITH run_stats AS (
SELECT
status,
COUNT(*) as cnt,
AVG(duration_ms) as avg_duration,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY duration_ms) as p50_duration,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95_duration,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY duration_ms) as p99_duration,
MIN(duration_ms) as min_duration,
MAX(duration_ms) as max_duration
FROM workflow_runs
WHERE store_id = p_store_id
AND created_at >= now() - (p_days || ' days')::INTERVAL
GROUP BY status
),
step_stats AS (
SELECT
sr.step_type,
COUNT(*) as cnt,
AVG(sr.duration_ms) as avg_duration,
SUM(CASE WHEN sr.status = 'failed' THEN 1 ELSE 0 END) as failures,
SUM(CASE WHEN sr.status = 'success' THEN 1 ELSE 0 END) as successes
FROM workflow_step_runs sr
JOIN workflow_runs wr ON wr.id = sr.run_id
WHERE wr.store_id = p_store_id
AND sr.created_at >= now() - (p_days || ' days')::INTERVAL
GROUP BY sr.step_type
),
dlq_stats AS (
SELECT
status,
COUNT(*) as cnt
FROM workflow_dlq
WHERE store_id = p_store_id
AND created_at >= now() - (p_days || ' days')::INTERVAL
GROUP BY status
),
top_errors AS (
SELECT
sr.step_key,
sr.error_message,
COUNT(*) as cnt
FROM workflow_step_runs sr
JOIN workflow_runs wr ON wr.id = sr.run_id
WHERE wr.store_id = p_store_id
AND sr.status = 'failed'
AND sr.created_at >= now() - (p_days || ' days')::INTERVAL
GROUP BY sr.step_key, sr.error_message
ORDER BY cnt DESC
LIMIT 10
)
SELECT jsonb_build_object(
'run_stats', COALESCE((SELECT jsonb_agg(row_to_json(r)) FROM run_stats r), '[]'::JSONB),
'step_stats', COALESCE((SELECT jsonb_agg(row_to_json(s)) FROM step_stats s), '[]'::JSONB),
'dlq_stats', COALESCE((SELECT jsonb_agg(row_to_json(d)) FROM dlq_stats d), '[]'::JSONB),
'top_errors', COALESCE((SELECT jsonb_agg(row_to_json(e)) FROM top_errors e), '[]'::JSONB),
'period_days', p_days
) INTO v_result;
RETURN v_result;
END;
$$ LANGUAGE plpgsql;
-- ============================================================================
-- 6. SCHEDULE TRIGGER FUNCTION
-- Called by the worker loop to fire due schedules
-- ============================================================================
CREATE OR REPLACE FUNCTION fire_due_schedules(p_limit INTEGER DEFAULT 10)
RETURNS TABLE(workflow_id UUID, store_id UUID, run_id UUID) AS $$
BEGIN
RETURN QUERY
WITH due AS (
SELECT w.id AS wf_id, w.store_id AS ws_id
FROM workflows w
WHERE w.next_run_at IS NOT NULL
AND w.next_run_at <= now()
AND w.is_active = true
AND w.status = 'active'
AND w.cron_expression IS NOT NULL
ORDER BY w.next_run_at
LIMIT p_limit
FOR UPDATE SKIP LOCKED
),
started AS (
SELECT
d.wf_id,
d.ws_id,
start_workflow_run(d.wf_id, d.ws_id, 'schedule', '{}'::JSONB, NULL) AS run_result
FROM due d
),
updated AS (
UPDATE workflows w
SET last_scheduled_at = now(),
next_run_at = NULL -- Caller must recompute next_run_at from cron_expression
FROM due d
WHERE w.id = d.wf_id
RETURNING w.id
)
SELECT s.wf_id, s.ws_id, (s.run_result->>'run_id')::UUID
FROM started s
WHERE s.run_result->>'success' = 'true';
END;
$$ LANGUAGE plpgsql;
-- ============================================================================
-- 7. TIMEOUT ENFORCEMENT FUNCTION
-- Cancels runs that exceeded max_run_duration_seconds
-- ============================================================================
CREATE OR REPLACE FUNCTION enforce_workflow_timeouts()
RETURNS INTEGER AS $$
DECLARE
v_count INTEGER := 0;
BEGIN
WITH timed_out AS (
SELECT wr.id AS run_id, wr.workflow_id, wr.store_id
FROM workflow_runs wr
JOIN workflows w ON w.id = wr.workflow_id
WHERE wr.status = 'running'
AND wr.started_at IS NOT NULL
AND w.max_run_duration_seconds IS NOT NULL
AND w.max_run_duration_seconds > 0
AND now() > wr.started_at + (w.max_run_duration_seconds || ' seconds')::INTERVAL
LIMIT 50
FOR UPDATE OF wr SKIP LOCKED
)
UPDATE workflow_runs wr2
SET status = 'timed_out',
error_message = 'Workflow exceeded max_run_duration_seconds',
completed_at = now(),
duration_ms = EXTRACT(EPOCH FROM (now() - wr2.started_at)) * 1000
FROM timed_out t
WHERE wr2.id = t.run_id;
GET DIAGNOSTICS v_count = ROW_COUNT;
-- Also cancel pending/running step_runs for timed-out runs
UPDATE workflow_step_runs
SET status = 'cancelled'
WHERE run_id IN (
SELECT wr.id FROM workflow_runs wr WHERE wr.status = 'timed_out' AND wr.completed_at >= now() - INTERVAL '10 seconds'
)
AND status IN ('pending', 'running', 'retrying', 'waiting');
RETURN v_count;
END;
$$ LANGUAGE plpgsql;