-- ============================================================================
-- Workflow Engine — Phase 1: Core Schema
-- Deploy via run-sql edge function or psql
-- ============================================================================
-- 1. workflows — Workflow definitions
CREATE TABLE IF NOT EXISTS workflows (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
store_id UUID NOT NULL REFERENCES stores(id) ON DELETE CASCADE,
name TEXT NOT NULL,
description TEXT,
icon TEXT,
-- Status
status TEXT NOT NULL DEFAULT 'draft' CHECK (status IN ('draft', 'active', 'paused', 'archived')),
is_active BOOLEAN NOT NULL DEFAULT false,
-- Trigger
trigger_type TEXT NOT NULL DEFAULT 'manual' CHECK (trigger_type IN ('event', 'schedule', 'condition', 'webhook', 'manual', 'api')),
trigger_config JSONB DEFAULT '{}',
-- Limits
max_concurrent_runs INTEGER NOT NULL DEFAULT 1,
max_run_duration_seconds INTEGER NOT NULL DEFAULT 3600,
max_steps_per_run INTEGER NOT NULL DEFAULT 50,
max_retries_per_step INTEGER NOT NULL DEFAULT 3,
-- Circuit breaker (workflow-level)
circuit_breaker_threshold INTEGER NOT NULL DEFAULT 5,
circuit_breaker_window_seconds INTEGER NOT NULL DEFAULT 3600,
circuit_breaker_cooldown_seconds INTEGER NOT NULL DEFAULT 300,
circuit_breaker_state TEXT NOT NULL DEFAULT 'closed' CHECK (circuit_breaker_state IN ('closed', 'open', 'half_open')),
circuit_breaker_failures INTEGER NOT NULL DEFAULT 0,
circuit_breaker_tripped_at TIMESTAMPTZ,
-- Metadata
version INTEGER NOT NULL DEFAULT 1,
created_by UUID,
last_run_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_workflows_store ON workflows(store_id);
CREATE INDEX IF NOT EXISTS idx_workflows_status ON workflows(store_id, status) WHERE is_active = true;
CREATE INDEX IF NOT EXISTS idx_workflows_trigger ON workflows(trigger_type) WHERE is_active = true;
-- 2. workflow_steps — DAG nodes
CREATE TABLE IF NOT EXISTS workflow_steps (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
step_key TEXT NOT NULL,
step_type TEXT NOT NULL CHECK (step_type IN ('tool', 'condition', 'transform', 'delay', 'agent', 'sub_workflow', 'webhook_out', 'parallel', 'noop')),
-- DAG routing
is_entry_point BOOLEAN NOT NULL DEFAULT false,
on_success TEXT, -- step_key to go to on success (NULL = end workflow)
on_failure TEXT, -- step_key to go to on failure (NULL = fail workflow)
-- Type-specific configuration
step_config JSONB NOT NULL DEFAULT '{}',
-- Retry
max_retries INTEGER NOT NULL DEFAULT 3,
retry_delay_seconds INTEGER NOT NULL DEFAULT 10,
timeout_seconds INTEGER NOT NULL DEFAULT 60,
-- Validation
input_schema JSONB, -- Optional JSON Schema for input validation
-- Visual editor position
position_x INTEGER DEFAULT 0,
position_y INTEGER DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(workflow_id, step_key)
);
CREATE INDEX IF NOT EXISTS idx_workflow_steps_workflow ON workflow_steps(workflow_id);
CREATE INDEX IF NOT EXISTS idx_workflow_steps_entry ON workflow_steps(workflow_id) WHERE is_entry_point = true;
-- 3. workflow_runs — Execution instances
CREATE TABLE IF NOT EXISTS workflow_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
store_id UUID NOT NULL REFERENCES stores(id) ON DELETE CASCADE,
-- Status
status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'success', 'failed', 'cancelled', 'timed_out', 'paused')),
-- Trigger context
trigger_type TEXT,
trigger_payload JSONB DEFAULT '{}',
-- Idempotency
idempotency_key TEXT,
-- Progress
current_step_key TEXT,
step_outputs JSONB DEFAULT '{}', -- accumulated: { "step_key": { output, status, duration_ms } }
-- Error
error_message TEXT,
error_step_key TEXT,
-- Timing
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
duration_ms INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Partial unique index for idempotency (only for non-null keys within a workflow)
CREATE UNIQUE INDEX IF NOT EXISTS idx_workflow_runs_idempotency
ON workflow_runs(workflow_id, idempotency_key) WHERE idempotency_key IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow_id);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_store ON workflow_runs(store_id);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status) WHERE status IN ('pending', 'running');
CREATE INDEX IF NOT EXISTS idx_workflow_runs_created ON workflow_runs(created_at DESC);
-- 4. workflow_step_runs — Per-step execution state
CREATE TABLE IF NOT EXISTS workflow_step_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID NOT NULL REFERENCES workflow_runs(id) ON DELETE CASCADE,
step_id UUID NOT NULL REFERENCES workflow_steps(id) ON DELETE CASCADE,
step_key TEXT NOT NULL,
step_type TEXT NOT NULL,
-- Status
status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'success', 'failed', 'skipped', 'cancelled', 'retrying', 'waiting')),
-- Data
input JSONB,
output JSONB,
error_message TEXT,
-- Retry
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
next_retry_at TIMESTAMPTZ,
-- Timing
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
duration_ms INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_step_runs_run ON workflow_step_runs(run_id);
CREATE INDEX IF NOT EXISTS idx_step_runs_pending ON workflow_step_runs(status, next_retry_at)
WHERE status IN ('pending', 'retrying', 'waiting');
CREATE INDEX IF NOT EXISTS idx_step_runs_running ON workflow_step_runs(status, started_at)
WHERE status = 'running';
-- 5. webhook_endpoints — Registered webhook URLs
CREATE TABLE IF NOT EXISTS webhook_endpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
store_id UUID NOT NULL REFERENCES stores(id) ON DELETE CASCADE,
name TEXT NOT NULL,
description TEXT,
-- URL routing
slug TEXT NOT NULL,
-- Security
signing_secret TEXT NOT NULL DEFAULT encode(gen_random_bytes(32), 'hex'),
signing_algorithm TEXT NOT NULL DEFAULT 'hmac-sha256',
verify_signature BOOLEAN NOT NULL DEFAULT true,
-- What to trigger
workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
-- Transform incoming body
payload_transform JSONB,
-- Rate limiting
max_requests_per_minute INTEGER NOT NULL DEFAULT 60,
-- Metadata
is_active BOOLEAN NOT NULL DEFAULT true,
last_received_at TIMESTAMPTZ,
total_received INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_webhook_slug ON webhook_endpoints(store_id, slug);
CREATE INDEX IF NOT EXISTS idx_webhook_workflow ON webhook_endpoints(workflow_id);
CREATE INDEX IF NOT EXISTS idx_webhook_active ON webhook_endpoints(is_active) WHERE is_active = true;
-- ============================================================================
-- RPC: start_workflow_run — Creates a run + first step_run(s)
-- ============================================================================
CREATE OR REPLACE FUNCTION start_workflow_run(
p_workflow_id UUID,
p_store_id UUID,
p_trigger_type TEXT DEFAULT 'manual',
p_trigger_payload JSONB DEFAULT '{}',
p_idempotency_key TEXT DEFAULT NULL
) RETURNS JSONB AS $$
DECLARE
v_workflow RECORD;
v_run_id UUID;
v_concurrent_count INTEGER;
v_step RECORD;
v_entry_count INTEGER := 0;
BEGIN
-- Load workflow
SELECT * INTO v_workflow FROM workflows
WHERE id = p_workflow_id AND store_id = p_store_id AND is_active = true;
IF NOT FOUND THEN
RETURN jsonb_build_object('success', false, 'error', 'Workflow not found or inactive');
END IF;
-- Check circuit breaker
IF v_workflow.circuit_breaker_state = 'open' THEN
IF v_workflow.circuit_breaker_tripped_at + (v_workflow.circuit_breaker_cooldown_seconds || ' seconds')::interval > now() THEN
RETURN jsonb_build_object('success', false, 'error', 'Circuit breaker is open — workflow is temporarily disabled');
END IF;
-- Past cooldown — transition to half_open
UPDATE workflows SET circuit_breaker_state = 'half_open', updated_at = now() WHERE id = p_workflow_id;
END IF;
-- Check concurrency limit
SELECT COUNT(*) INTO v_concurrent_count FROM workflow_runs
WHERE workflow_id = p_workflow_id AND status IN ('pending', 'running');
IF v_concurrent_count >= v_workflow.max_concurrent_runs THEN
RETURN jsonb_build_object('success', false, 'error',
format('Concurrency limit reached (%s/%s active runs)', v_concurrent_count, v_workflow.max_concurrent_runs));
END IF;
-- Idempotency check
IF p_idempotency_key IS NOT NULL THEN
SELECT id INTO v_run_id FROM workflow_runs
WHERE workflow_id = p_workflow_id AND idempotency_key = p_idempotency_key;
IF FOUND THEN
RETURN jsonb_build_object('success', true, 'run_id', v_run_id, 'deduplicated', true);
END IF;
END IF;
-- Create run
INSERT INTO workflow_runs (workflow_id, store_id, status, trigger_type, trigger_payload, idempotency_key, started_at)
VALUES (p_workflow_id, p_store_id, 'running', p_trigger_type, p_trigger_payload, p_idempotency_key, now())
RETURNING id INTO v_run_id;
-- Create step_runs for entry points
FOR v_step IN SELECT * FROM workflow_steps WHERE workflow_id = p_workflow_id AND is_entry_point = true ORDER BY step_key LOOP
INSERT INTO workflow_step_runs (run_id, step_id, step_key, step_type, status, max_attempts)
VALUES (v_run_id, v_step.id, v_step.step_key, v_step.step_type, 'pending', v_step.max_retries);
v_entry_count := v_entry_count + 1;
END LOOP;
IF v_entry_count = 0 THEN
UPDATE workflow_runs SET status = 'failed', error_message = 'No entry point steps defined', completed_at = now()
WHERE id = v_run_id;
RETURN jsonb_build_object('success', false, 'error', 'No entry point steps defined', 'run_id', v_run_id);
END IF;
-- Update workflow last_run_at
UPDATE workflows SET last_run_at = now(), updated_at = now() WHERE id = p_workflow_id;
RETURN jsonb_build_object('success', true, 'run_id', v_run_id, 'entry_steps', v_entry_count);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- RPC: claim_pending_steps — Atomically claim pending/retrying steps
-- ============================================================================
CREATE OR REPLACE FUNCTION claim_pending_steps(
p_batch_size INTEGER DEFAULT 10
) RETURNS JSONB AS $$
DECLARE
v_claimed UUID[];
v_step RECORD;
v_result JSONB := '[]'::JSONB;
BEGIN
-- Claim pending and retrying steps (that have passed their retry time)
-- Uses FOR UPDATE SKIP LOCKED to prevent double-claiming
FOR v_step IN
SELECT sr.id, sr.run_id, sr.step_id, sr.step_key, sr.step_type, sr.attempt_count, sr.max_attempts,
ws.step_config, ws.on_success, ws.on_failure, ws.timeout_seconds, ws.input_schema,
wr.workflow_id, wr.store_id, wr.step_outputs, wr.trigger_payload, wr.status as run_status,
w.max_steps_per_run
FROM workflow_step_runs sr
JOIN workflow_runs wr ON wr.id = sr.run_id
JOIN workflow_steps ws ON ws.id = sr.step_id
JOIN workflows w ON w.id = wr.workflow_id
WHERE sr.status IN ('pending', 'retrying')
AND (sr.next_retry_at IS NULL OR sr.next_retry_at <= now())
AND wr.status = 'running'
AND w.is_active = true
ORDER BY sr.created_at ASC
LIMIT p_batch_size
FOR UPDATE OF sr SKIP LOCKED
LOOP
-- Mark as running
UPDATE workflow_step_runs
SET status = 'running', started_at = now(), attempt_count = attempt_count + 1, updated_at = now()
WHERE id = v_step.id;
-- Update run's current_step_key
UPDATE workflow_runs SET current_step_key = v_step.step_key, updated_at = now()
WHERE id = v_step.run_id;
v_result := v_result || jsonb_build_object(
'step_run_id', v_step.id,
'run_id', v_step.run_id,
'workflow_id', v_step.workflow_id,
'store_id', v_step.store_id,
'step_id', v_step.id,
'step_key', v_step.step_key,
'step_type', v_step.step_type,
'step_config', v_step.step_config,
'on_success', v_step.on_success,
'on_failure', v_step.on_failure,
'timeout_seconds', v_step.timeout_seconds,
'input_schema', v_step.input_schema,
'step_outputs', v_step.step_outputs,
'trigger_payload', v_step.trigger_payload,
'attempt_count', v_step.attempt_count + 1,
'max_attempts', v_step.max_attempts,
'max_steps_per_run', v_step.max_steps_per_run
);
END LOOP;
-- Also: recover stuck steps (running but timed out)
UPDATE workflow_step_runs sr
SET status = CASE
WHEN sr.attempt_count < sr.max_attempts THEN 'retrying'
ELSE 'failed'
END,
error_message = 'Step timed out (recovered by worker)',
next_retry_at = CASE
WHEN sr.attempt_count < sr.max_attempts THEN now() + interval '10 seconds'
ELSE NULL
END,
updated_at = now()
FROM workflow_steps ws
WHERE sr.step_id = ws.id
AND sr.status = 'running'
AND sr.started_at < now() - (ws.timeout_seconds || ' seconds')::interval;
RETURN v_result;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- RPC: get_workflow_analytics — Analytics for workflow runs
-- ============================================================================
CREATE OR REPLACE FUNCTION get_workflow_analytics(
p_store_id UUID,
p_days INTEGER DEFAULT 30
) RETURNS JSONB AS $$
DECLARE
v_result JSONB;
BEGIN
SELECT jsonb_build_object(
'total_runs', COUNT(*),
'success_count', COUNT(*) FILTER (WHERE status = 'success'),
'failed_count', COUNT(*) FILTER (WHERE status = 'failed'),
'cancelled_count', COUNT(*) FILTER (WHERE status = 'cancelled'),
'timed_out_count', COUNT(*) FILTER (WHERE status = 'timed_out'),
'running_count', COUNT(*) FILTER (WHERE status = 'running'),
'success_rate', ROUND(
CASE WHEN COUNT(*) FILTER (WHERE status IN ('success', 'failed')) > 0
THEN COUNT(*) FILTER (WHERE status = 'success')::NUMERIC / COUNT(*) FILTER (WHERE status IN ('success', 'failed')) * 100
ELSE 0
END, 1
),
'avg_duration_ms', ROUND(AVG(duration_ms) FILTER (WHERE duration_ms IS NOT NULL)),
'p95_duration_ms', ROUND(PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms) FILTER (WHERE duration_ms IS NOT NULL)),
'by_workflow', (
SELECT COALESCE(jsonb_agg(jsonb_build_object(
'workflow_id', wr.workflow_id,
'workflow_name', w.name,
'runs', COUNT(*),
'successes', COUNT(*) FILTER (WHERE wr.status = 'success'),
'failures', COUNT(*) FILTER (WHERE wr.status = 'failed'),
'avg_duration_ms', ROUND(AVG(wr.duration_ms) FILTER (WHERE wr.duration_ms IS NOT NULL))
)), '[]'::jsonb)
FROM workflow_runs wr
JOIN workflows w ON w.id = wr.workflow_id
WHERE wr.store_id = p_store_id AND wr.created_at >= now() - (p_days || ' days')::interval
GROUP BY wr.workflow_id, w.name
),
'by_trigger_type', (
SELECT COALESCE(jsonb_agg(jsonb_build_object(
'trigger_type', trigger_type,
'count', COUNT(*)
)), '[]'::jsonb)
FROM workflow_runs
WHERE store_id = p_store_id AND created_at >= now() - (p_days || ' days')::interval
GROUP BY trigger_type
),
'period_days', p_days
) INTO v_result
FROM workflow_runs
WHERE store_id = p_store_id AND created_at >= now() - (p_days || ' days')::interval;
RETURN v_result;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- RLS Policies
-- ============================================================================
ALTER TABLE workflows ENABLE ROW LEVEL SECURITY;
ALTER TABLE workflow_steps ENABLE ROW LEVEL SECURITY;
ALTER TABLE workflow_runs ENABLE ROW LEVEL SECURITY;
ALTER TABLE workflow_step_runs ENABLE ROW LEVEL SECURITY;
ALTER TABLE webhook_endpoints ENABLE ROW LEVEL SECURITY;
-- Workflows: store members can read, service role can write
CREATE POLICY workflows_select ON workflows FOR SELECT
USING (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
CREATE POLICY workflows_insert ON workflows FOR INSERT
WITH CHECK (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
CREATE POLICY workflows_update ON workflows FOR UPDATE
USING (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
CREATE POLICY workflows_delete ON workflows FOR DELETE
USING (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
-- Workflow steps: inherit workflow access
CREATE POLICY workflow_steps_select ON workflow_steps FOR SELECT
USING (workflow_id IN (SELECT id FROM workflows WHERE store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid())));
CREATE POLICY workflow_steps_insert ON workflow_steps FOR INSERT
WITH CHECK (workflow_id IN (SELECT id FROM workflows WHERE store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid())));
CREATE POLICY workflow_steps_update ON workflow_steps FOR UPDATE
USING (workflow_id IN (SELECT id FROM workflows WHERE store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid())));
CREATE POLICY workflow_steps_delete ON workflow_steps FOR DELETE
USING (workflow_id IN (SELECT id FROM workflows WHERE store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid())));
-- Workflow runs: store-scoped
CREATE POLICY workflow_runs_select ON workflow_runs FOR SELECT
USING (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
CREATE POLICY workflow_runs_insert ON workflow_runs FOR INSERT
WITH CHECK (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
CREATE POLICY workflow_runs_update ON workflow_runs FOR UPDATE
USING (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
-- Step runs: inherit run access
CREATE POLICY step_runs_select ON workflow_step_runs FOR SELECT
USING (run_id IN (SELECT id FROM workflow_runs WHERE store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid())));
CREATE POLICY step_runs_insert ON workflow_step_runs FOR INSERT
WITH CHECK (run_id IN (SELECT id FROM workflow_runs WHERE store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid())));
CREATE POLICY step_runs_update ON workflow_step_runs FOR UPDATE
USING (run_id IN (SELECT id FROM workflow_runs WHERE store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid())));
-- Webhook endpoints: store-scoped
CREATE POLICY webhooks_select ON webhook_endpoints FOR SELECT
USING (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
CREATE POLICY webhooks_insert ON webhook_endpoints FOR INSERT
WITH CHECK (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
CREATE POLICY webhooks_update ON webhook_endpoints FOR UPDATE
USING (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
CREATE POLICY webhooks_delete ON webhook_endpoints FOR DELETE
USING (store_id IN (SELECT store_id FROM store_members WHERE user_id = auth.uid()));
-- Updated_at trigger
CREATE OR REPLACE FUNCTION update_updated_at() RETURNS TRIGGER AS $$
BEGIN NEW.updated_at = now(); RETURN NEW; END;
$$ LANGUAGE plpgsql;
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_workflows_updated_at') THEN
CREATE TRIGGER trg_workflows_updated_at BEFORE UPDATE ON workflows FOR EACH ROW EXECUTE FUNCTION update_updated_at();
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_workflow_steps_updated_at') THEN
CREATE TRIGGER trg_workflow_steps_updated_at BEFORE UPDATE ON workflow_steps FOR EACH ROW EXECUTE FUNCTION update_updated_at();
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_workflow_runs_updated_at') THEN
CREATE TRIGGER trg_workflow_runs_updated_at BEFORE UPDATE ON workflow_runs FOR EACH ROW EXECUTE FUNCTION update_updated_at();
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_step_runs_updated_at') THEN
CREATE TRIGGER trg_step_runs_updated_at BEFORE UPDATE ON workflow_step_runs FOR EACH ROW EXECUTE FUNCTION update_updated_at();
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_webhook_endpoints_updated_at') THEN
CREATE TRIGGER trg_webhook_endpoints_updated_at BEFORE UPDATE ON webhook_endpoints FOR EACH ROW EXECUTE FUNCTION update_updated_at();
END IF;
END $$;