-- ============================================================================
-- Workflow Engine v5.2 — Close All Gaps vs n8n/Zapier/LangGraph
-- Phases 1-5, 7: Inline execution, approvals, SSE, versioning, templates, code
-- ============================================================================
-- ============================================================================
-- PHASE 1: Inline Execution Infrastructure
-- pg_notify triggers for real-time step pickup
-- ============================================================================
-- Notify on new pending step_runs (used by inline execution + SSE)
CREATE OR REPLACE FUNCTION notify_pending_step() RETURNS trigger AS $$
BEGIN
IF NEW.status = 'pending' THEN
PERFORM pg_notify('workflow_step_pending', json_build_object(
'step_run_id', NEW.id,
'run_id', NEW.run_id,
'step_key', NEW.step_key,
'step_type', NEW.step_type
)::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trg_workflow_step_pending ON workflow_step_runs;
CREATE TRIGGER trg_workflow_step_pending
AFTER INSERT ON workflow_step_runs
FOR EACH ROW EXECUTE FUNCTION notify_pending_step();
-- ============================================================================
-- PHASE 2: Human-in-the-Loop (Approval Step)
-- ============================================================================
-- Add 'approval' to allowed step types
ALTER TABLE workflow_steps DROP CONSTRAINT IF EXISTS workflow_steps_step_type_check;
ALTER TABLE workflow_steps ADD CONSTRAINT workflow_steps_step_type_check
CHECK (step_type IN ('tool', 'condition', 'transform', 'delay', 'agent', 'sub_workflow',
'webhook_out', 'parallel', 'for_each', 'code', 'noop', 'approval'));
-- Add 'for_each' and 'code' if missing from step_runs (was added in code but not DB)
ALTER TABLE workflow_step_runs DROP CONSTRAINT IF EXISTS workflow_step_runs_status_check;
ALTER TABLE workflow_step_runs ADD CONSTRAINT workflow_step_runs_status_check
CHECK (status IN ('pending', 'running', 'success', 'failed', 'skipped', 'cancelled', 'retrying', 'waiting'));
CREATE TABLE IF NOT EXISTS workflow_approval_requests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
store_id UUID NOT NULL REFERENCES stores(id) ON DELETE CASCADE,
run_id UUID NOT NULL REFERENCES workflow_runs(id) ON DELETE CASCADE,
step_run_id UUID NOT NULL REFERENCES workflow_step_runs(id) ON DELETE CASCADE,
workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
-- Request
title TEXT NOT NULL,
description TEXT,
prompt TEXT,
options JSONB DEFAULT '["approve","reject"]',
form_schema JSONB,
-- Assignment
assigned_to UUID,
assigned_role TEXT,
-- Response
status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'approved', 'rejected', 'expired', 'cancelled')),
response_data JSONB,
responded_by UUID,
responded_at TIMESTAMPTZ,
-- Timeout
expires_at TIMESTAMPTZ,
timeout_action TEXT DEFAULT 'fail' CHECK (timeout_action IN ('fail', 'approve', 'reject', 'skip')),
-- Notifications
notification_channels JSONB DEFAULT '["push"]',
reminder_interval_seconds INTEGER DEFAULT 3600,
last_reminder_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_approval_store ON workflow_approval_requests(store_id);
CREATE INDEX IF NOT EXISTS idx_approval_run ON workflow_approval_requests(run_id);
CREATE INDEX IF NOT EXISTS idx_approval_pending ON workflow_approval_requests(status, expires_at) WHERE status = 'pending';
CREATE INDEX IF NOT EXISTS idx_approval_step_run ON workflow_approval_requests(step_run_id);
ALTER TABLE workflow_approval_requests ENABLE ROW LEVEL SECURITY;
CREATE POLICY approval_select ON workflow_approval_requests FOR SELECT
USING (store_id IN (SELECT get_store_id_from_jwt()));
CREATE POLICY approval_insert ON workflow_approval_requests FOR INSERT
WITH CHECK (store_id IN (SELECT get_store_id_from_jwt()));
CREATE POLICY approval_update ON workflow_approval_requests FOR UPDATE
USING (store_id IN (SELECT get_store_id_from_jwt()));
-- RPC: respond_to_approval — handles approval response + resumes workflow
CREATE OR REPLACE FUNCTION respond_to_approval(
p_approval_id UUID,
p_store_id UUID,
p_status TEXT,
p_response_data JSONB DEFAULT '{}',
p_responded_by UUID DEFAULT NULL
) RETURNS JSONB AS $$
DECLARE
v_approval RECORD;
BEGIN
-- Validate status
IF p_status NOT IN ('approved', 'rejected') THEN
RETURN jsonb_build_object('success', false, 'error', 'Status must be approved or rejected');
END IF;
-- Load and lock approval
SELECT * INTO v_approval FROM workflow_approval_requests
WHERE id = p_approval_id AND store_id = p_store_id AND status = 'pending'
FOR UPDATE;
IF NOT FOUND THEN
RETURN jsonb_build_object('success', false, 'error', 'Approval not found or already responded');
END IF;
-- Update approval
UPDATE workflow_approval_requests SET
status = p_status,
response_data = p_response_data,
responded_by = p_responded_by,
responded_at = now(),
updated_at = now()
WHERE id = p_approval_id;
-- Resume the step — set back to pending with approval data as input
UPDATE workflow_step_runs SET
status = 'pending',
input = jsonb_build_object(
'approval_status', p_status,
'approval_data', p_response_data,
'responded_by', p_responded_by,
'responded_at', now()
),
updated_at = now()
WHERE id = v_approval.step_run_id AND status = 'waiting';
RETURN jsonb_build_object('success', true, 'approval_id', p_approval_id, 'status', p_status, 'run_id', v_approval.run_id);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- RPC: expire_pending_approvals — handles timeout actions
CREATE OR REPLACE FUNCTION expire_pending_approvals() RETURNS JSONB AS $$
DECLARE
v_approval RECORD;
v_expired_count INTEGER := 0;
BEGIN
FOR v_approval IN
SELECT * FROM workflow_approval_requests
WHERE status = 'pending' AND expires_at IS NOT NULL AND expires_at <= now()
FOR UPDATE SKIP LOCKED
LOOP
v_expired_count := v_expired_count + 1;
IF v_approval.timeout_action IN ('approve', 'reject') THEN
-- Auto-respond
UPDATE workflow_approval_requests SET
status = CASE WHEN v_approval.timeout_action = 'approve' THEN 'approved' ELSE 'rejected' END,
response_data = jsonb_build_object('auto_expired', true, 'timeout_action', v_approval.timeout_action),
responded_at = now(), updated_at = now()
WHERE id = v_approval.id;
-- Resume step
UPDATE workflow_step_runs SET
status = 'pending',
input = jsonb_build_object(
'approval_status', v_approval.timeout_action,
'approval_data', jsonb_build_object('auto_expired', true),
'responded_at', now()
),
updated_at = now()
WHERE id = v_approval.step_run_id AND status = 'waiting';
ELSIF v_approval.timeout_action = 'skip' THEN
UPDATE workflow_approval_requests SET status = 'expired', updated_at = now() WHERE id = v_approval.id;
UPDATE workflow_step_runs SET
status = 'skipped', error_message = 'Approval timed out (skipped)',
completed_at = now(), updated_at = now()
WHERE id = v_approval.step_run_id AND status = 'waiting';
ELSE
-- Default: fail
UPDATE workflow_approval_requests SET status = 'expired', updated_at = now() WHERE id = v_approval.id;
UPDATE workflow_step_runs SET
status = 'failed', error_message = 'Approval timed out',
completed_at = now(), updated_at = now()
WHERE id = v_approval.step_run_id AND status = 'waiting';
END IF;
END LOOP;
RETURN jsonb_build_object('expired_count', v_expired_count);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- PHASE 3: SSE Streaming — pg_notify on status changes
-- ============================================================================
-- Notify on step status changes (for SSE streaming)
CREATE OR REPLACE FUNCTION notify_step_status_change() RETURNS trigger AS $$
BEGIN
IF OLD.status IS DISTINCT FROM NEW.status THEN
PERFORM pg_notify('workflow_step_event', json_build_object(
'step_run_id', NEW.id,
'run_id', NEW.run_id,
'step_key', NEW.step_key,
'step_type', NEW.step_type,
'old_status', OLD.status,
'new_status', NEW.status,
'duration_ms', NEW.duration_ms,
'error_message', NEW.error_message
)::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trg_step_status_change ON workflow_step_runs;
CREATE TRIGGER trg_step_status_change
AFTER UPDATE OF status ON workflow_step_runs
FOR EACH ROW EXECUTE FUNCTION notify_step_status_change();
-- Notify on run status changes
CREATE OR REPLACE FUNCTION notify_run_status_change() RETURNS trigger AS $$
BEGIN
IF OLD.status IS DISTINCT FROM NEW.status THEN
PERFORM pg_notify('workflow_run_event', json_build_object(
'run_id', NEW.id,
'workflow_id', NEW.workflow_id,
'old_status', OLD.status,
'new_status', NEW.status,
'duration_ms', NEW.duration_ms,
'error_message', NEW.error_message,
'error_step_key', NEW.error_step_key
)::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trg_run_status_change ON workflow_runs;
CREATE TRIGGER trg_run_status_change
AFTER UPDATE OF status ON workflow_runs
FOR EACH ROW EXECUTE FUNCTION notify_run_status_change();
-- ============================================================================
-- PHASE 4: Workflow Versioning
-- ============================================================================
CREATE TABLE IF NOT EXISTS workflow_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
version INTEGER NOT NULL,
-- Immutable snapshot
name TEXT,
description TEXT,
trigger_type TEXT,
trigger_config JSONB,
steps JSONB NOT NULL, -- Array of step definitions (denormalized snapshot)
-- Metadata
published_by UUID,
published_at TIMESTAMPTZ DEFAULT now(),
changelog TEXT,
UNIQUE(workflow_id, version)
);
CREATE INDEX IF NOT EXISTS idx_workflow_versions_workflow ON workflow_versions(workflow_id);
ALTER TABLE workflow_versions ENABLE ROW LEVEL SECURITY;
CREATE POLICY workflow_versions_select ON workflow_versions FOR SELECT
USING (workflow_id IN (SELECT id FROM workflows WHERE store_id IN (SELECT get_store_id_from_jwt())));
CREATE POLICY workflow_versions_insert ON workflow_versions FOR INSERT
WITH CHECK (workflow_id IN (SELECT id FROM workflows WHERE store_id IN (SELECT get_store_id_from_jwt())));
-- Add versioning columns to workflows
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'workflows' AND column_name = 'current_version') THEN
ALTER TABLE workflows ADD COLUMN current_version INTEGER DEFAULT 0;
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'workflows' AND column_name = 'published_version_id') THEN
ALTER TABLE workflows ADD COLUMN published_version_id UUID REFERENCES workflow_versions(id);
END IF;
END $$;
-- Add version_id to runs
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'workflow_runs' AND column_name = 'version_id') THEN
ALTER TABLE workflow_runs ADD COLUMN version_id UUID REFERENCES workflow_versions(id);
END IF;
END $$;
-- RPC: publish_workflow_version — snapshots steps into immutable version
CREATE OR REPLACE FUNCTION publish_workflow_version(
p_workflow_id UUID,
p_store_id UUID,
p_changelog TEXT DEFAULT NULL,
p_published_by UUID DEFAULT NULL
) RETURNS JSONB AS $$
DECLARE
v_workflow RECORD;
v_steps JSONB;
v_new_version INTEGER;
v_version_id UUID;
BEGIN
SELECT * INTO v_workflow FROM workflows
WHERE id = p_workflow_id AND store_id = p_store_id;
IF NOT FOUND THEN
RETURN jsonb_build_object('success', false, 'error', 'Workflow not found');
END IF;
-- Snapshot all steps
SELECT COALESCE(jsonb_agg(jsonb_build_object(
'id', ws.id, 'step_key', ws.step_key, 'step_type', ws.step_type,
'is_entry_point', ws.is_entry_point, 'on_success', ws.on_success,
'on_failure', ws.on_failure, 'step_config', ws.step_config,
'max_retries', ws.max_retries, 'retry_delay_seconds', ws.retry_delay_seconds,
'timeout_seconds', ws.timeout_seconds, 'input_schema', ws.input_schema,
'position_x', ws.position_x, 'position_y', ws.position_y
) ORDER BY ws.step_key), '[]'::jsonb)
INTO v_steps
FROM workflow_steps ws WHERE ws.workflow_id = p_workflow_id;
v_new_version := COALESCE(v_workflow.current_version, 0) + 1;
INSERT INTO workflow_versions (workflow_id, version, name, description, trigger_type, trigger_config, steps, published_by, changelog)
VALUES (p_workflow_id, v_new_version, v_workflow.name, v_workflow.description, v_workflow.trigger_type, v_workflow.trigger_config, v_steps, p_published_by, p_changelog)
RETURNING id INTO v_version_id;
UPDATE workflows SET
current_version = v_new_version,
published_version_id = v_version_id,
updated_at = now()
WHERE id = p_workflow_id;
RETURN jsonb_build_object('success', true, 'version', v_new_version, 'version_id', v_version_id);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- PHASE 5: Pre-Built Integration Templates
-- ============================================================================
-- Add template columns to workflows
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'workflows' AND column_name = 'is_template') THEN
ALTER TABLE workflows ADD COLUMN is_template BOOLEAN DEFAULT false;
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'workflows' AND column_name = 'template_category') THEN
ALTER TABLE workflows ADD COLUMN template_category TEXT;
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'workflows' AND column_name = 'template_tags') THEN
ALTER TABLE workflows ADD COLUMN template_tags TEXT[];
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'workflows' AND column_name = 'clone_count') THEN
ALTER TABLE workflows ADD COLUMN clone_count INTEGER DEFAULT 0;
END IF;
END $$;
-- Allow NULL store_id for global templates
ALTER TABLE workflows ALTER COLUMN store_id DROP NOT NULL;
-- RPC: clone_workflow_template — deep-copies a template to a store
CREATE OR REPLACE FUNCTION clone_workflow_template(
p_template_id UUID,
p_store_id UUID,
p_name TEXT DEFAULT NULL
) RETURNS JSONB AS $$
DECLARE
v_template RECORD;
v_new_wf_id UUID;
v_step RECORD;
v_step_count INTEGER := 0;
BEGIN
SELECT * INTO v_template FROM workflows
WHERE id = p_template_id AND is_template = true;
IF NOT FOUND THEN
RETURN jsonb_build_object('success', false, 'error', 'Template not found');
END IF;
-- Clone workflow
INSERT INTO workflows (store_id, name, description, icon, status, trigger_type, trigger_config,
max_concurrent_runs, max_run_duration_seconds, max_steps_per_run, max_retries_per_step)
VALUES (p_store_id, COALESCE(p_name, v_template.name), v_template.description, v_template.icon,
'draft', v_template.trigger_type, v_template.trigger_config,
v_template.max_concurrent_runs, v_template.max_run_duration_seconds,
v_template.max_steps_per_run, v_template.max_retries_per_step)
RETURNING id INTO v_new_wf_id;
-- Clone steps
FOR v_step IN SELECT * FROM workflow_steps WHERE workflow_id = p_template_id ORDER BY step_key LOOP
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, on_failure,
step_config, max_retries, retry_delay_seconds, timeout_seconds, input_schema, position_x, position_y)
VALUES (v_new_wf_id, v_step.step_key, v_step.step_type, v_step.is_entry_point, v_step.on_success, v_step.on_failure,
v_step.step_config, v_step.max_retries, v_step.retry_delay_seconds, v_step.timeout_seconds,
v_step.input_schema, v_step.position_x, v_step.position_y);
v_step_count := v_step_count + 1;
END LOOP;
-- Increment clone count
UPDATE workflows SET clone_count = COALESCE(clone_count, 0) + 1 WHERE id = p_template_id;
RETURN jsonb_build_object('success', true, 'workflow_id', v_new_wf_id, 'steps_cloned', v_step_count);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- PHASE 5: Seed Templates (6 pre-built workflows)
-- ============================================================================
-- 1. Low Stock Alert
INSERT INTO workflows (store_id, name, description, icon, status, is_active, trigger_type, trigger_config, is_template, template_category, template_tags, max_steps_per_run)
VALUES (NULL, 'Low Stock Alert', 'Check inventory velocity daily and send email alerts for products below reorder threshold.', 'exclamationmark.triangle', 'active', true, 'schedule', '{"interval_minutes": 1440}', true, 'inventory', ARRAY['inventory', 'alerts', 'email'], 10)
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config, position_y)
SELECT w.id, 'check_velocity', 'tool', true, 'check_threshold',
'{"tool_name": "inventory", "args_template": {"action": "velocity", "days": 7, "limit": 100}}',
0
FROM workflows w WHERE w.name = 'Low Stock Alert' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, on_success, on_failure, step_config, position_y)
SELECT w.id, 'check_threshold', 'condition', 'send_alert', NULL,
'{"expression": "{{steps.check_velocity.output}} exists", "on_true": "send_alert", "on_false": null}',
100
FROM workflows w WHERE w.name = 'Low Stock Alert' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, step_config, position_y)
SELECT w.id, 'send_alert', 'tool', false, NULL,
'{"tool_name": "email", "args_template": {"action": "send", "to": "{{trigger.alert_email}}", "subject": "Low Stock Alert", "text": "The following products are running low on inventory. Please review and reorder as needed."}}',
200
FROM workflows w WHERE w.name = 'Low Stock Alert' AND w.is_template = true
ON CONFLICT DO NOTHING;
-- 2. Daily Sales Report
INSERT INTO workflows (store_id, name, description, icon, status, is_active, trigger_type, trigger_config, is_template, template_category, template_tags, max_steps_per_run)
VALUES (NULL, 'Daily Sales Report', 'Generate and email a daily sales summary every morning.', 'chart.bar', 'active', true, 'schedule', '{"interval_minutes": 1440}', true, 'analytics', ARRAY['analytics', 'reports', 'email'], 10)
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config, position_y)
SELECT w.id, 'get_summary', 'tool', true, 'format_report',
'{"tool_name": "analytics", "args_template": {"action": "summary", "period": "yesterday"}}',
0
FROM workflows w WHERE w.name = 'Daily Sales Report' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, on_success, step_config, position_y)
SELECT w.id, 'format_report', 'transform', false, 'send_email',
'{"mapping": {"total_revenue": "{{steps.get_summary.output.total_revenue}}", "total_orders": "{{steps.get_summary.output.total_orders}}", "avg_order": "{{steps.get_summary.output.avg_order_value}}", "date": "{{today}}"}}',
100
FROM workflows w WHERE w.name = 'Daily Sales Report' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, step_config, position_y)
SELECT w.id, 'send_email', 'tool', false, NULL,
'{"tool_name": "email", "args_template": {"action": "send", "to": "{{trigger.report_email}}", "subject": "Daily Sales Report — {{steps.format_report.output.date}}", "text": "Revenue: ${{steps.format_report.output.total_revenue}}\nOrders: {{steps.format_report.output.total_orders}}\nAvg Order: ${{steps.format_report.output.avg_order}}"}}',
200
FROM workflows w WHERE w.name = 'Daily Sales Report' AND w.is_template = true
ON CONFLICT DO NOTHING;
-- 3. New Customer Welcome
INSERT INTO workflows (store_id, name, description, icon, status, is_active, trigger_type, is_template, template_category, template_tags, max_steps_per_run)
VALUES (NULL, 'New Customer Welcome', 'Send a welcome email when a new customer is created via webhook.', 'person.badge.plus', 'active', true, 'webhook', true, 'crm', ARRAY['customers', 'email', 'onboarding'], 5)
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config, position_y)
SELECT w.id, 'extract_info', 'transform', true, 'send_welcome',
'{"mapping": {"name": "{{trigger.first_name}}", "email": "{{trigger.email}}"}}',
0
FROM workflows w WHERE w.name = 'New Customer Welcome' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, step_config, position_y)
SELECT w.id, 'send_welcome', 'tool', false, NULL,
'{"tool_name": "email", "args_template": {"action": "send", "to": "{{steps.extract_info.output.email}}", "subject": "Welcome, {{steps.extract_info.output.name}}!", "text": "Thank you for joining us! We are excited to have you as a customer."}}',
100
FROM workflows w WHERE w.name = 'New Customer Welcome' AND w.is_template = true
ON CONFLICT DO NOTHING;
-- 4. Inventory Reorder with Approval
INSERT INTO workflows (store_id, name, description, icon, status, is_active, trigger_type, trigger_config, is_template, template_category, template_tags, max_steps_per_run)
VALUES (NULL, 'Inventory Reorder with Approval', 'Check inventory weekly, create PO for low stock items, and wait for manager approval before submitting.', 'arrow.triangle.2.circlepath', 'active', true, 'schedule', '{"interval_minutes": 10080}', true, 'inventory', ARRAY['inventory', 'purchasing', 'approval'], 20)
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config, position_y)
SELECT w.id, 'check_stock', 'tool', true, 'needs_reorder',
'{"tool_name": "inventory", "args_template": {"action": "velocity", "days": 14, "limit": 50}}',
0
FROM workflows w WHERE w.name = 'Inventory Reorder with Approval' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, on_success, step_config, position_y)
SELECT w.id, 'needs_reorder', 'condition', false, 'create_po',
'{"expression": "{{steps.check_stock.output}} exists", "on_true": "create_po", "on_false": null}',
100
FROM workflows w WHERE w.name = 'Inventory Reorder with Approval' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, on_success, step_config, position_y)
SELECT w.id, 'create_po', 'tool', false, 'approve_po',
'{"tool_name": "supply_chain", "args_template": {"action": "po_create", "notes": "Auto-generated reorder from low stock workflow"}}',
200
FROM workflows w WHERE w.name = 'Inventory Reorder with Approval' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, on_success, on_failure, step_config, position_y)
SELECT w.id, 'approve_po', 'approval', false, 'submit_po', 'notify_rejected',
'{"title": "Approve Reorder PO", "prompt": "Review the auto-generated purchase order for low stock items.", "options": ["approve", "reject"], "timeout_seconds": 86400, "timeout_action": "fail"}',
300
FROM workflows w WHERE w.name = 'Inventory Reorder with Approval' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, step_config, position_y)
SELECT w.id, 'submit_po', 'tool', false, NULL,
'{"tool_name": "supply_chain", "args_template": {"action": "po_approve", "purchase_order_id": "{{steps.create_po.output.id}}"}}',
400
FROM workflows w WHERE w.name = 'Inventory Reorder with Approval' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, step_config, position_y)
SELECT w.id, 'notify_rejected', 'tool', false, NULL,
'{"tool_name": "email", "args_template": {"action": "send", "to": "{{trigger.alert_email}}", "subject": "PO Rejected", "text": "The auto-generated purchase order was rejected."}}',
400
FROM workflows w WHERE w.name = 'Inventory Reorder with Approval' AND w.is_template = true
ON CONFLICT DO NOTHING;
-- 5. Order Status Webhook (Slack/Discord)
INSERT INTO workflows (store_id, name, description, icon, status, is_active, trigger_type, is_template, template_category, template_tags, max_steps_per_run)
VALUES (NULL, 'Order Status Webhook', 'Forward order status changes to Slack or Discord via webhook.', 'bell.badge', 'active', true, 'webhook', true, 'integrations', ARRAY['orders', 'slack', 'discord', 'webhook'], 5)
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config, position_y)
SELECT w.id, 'format_message', 'transform', true, 'send_webhook',
'{"mapping": {"text": "Order #{{trigger.order_number}} status changed to {{trigger.status}}", "username": "OrderBot"}}',
0
FROM workflows w WHERE w.name = 'Order Status Webhook' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, step_config, position_y)
SELECT w.id, 'send_webhook', 'webhook_out', false, NULL,
'{"url": "{{trigger.webhook_url}}", "method": "POST", "body_template": {"content": "{{steps.format_message.output.text}}", "username": "{{steps.format_message.output.username}}"}}',
100
FROM workflows w WHERE w.name = 'Order Status Webhook' AND w.is_template = true
ON CONFLICT DO NOTHING;
-- 6. Stripe Payment Handler
INSERT INTO workflows (store_id, name, description, icon, status, is_active, trigger_type, is_template, template_category, template_tags, max_steps_per_run)
VALUES (NULL, 'Stripe Payment Handler', 'Process Stripe webhook events: update order status and send confirmation email.', 'creditcard', 'active', true, 'webhook', true, 'integrations', ARRAY['stripe', 'payments', 'orders', 'email'], 10)
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config, position_y)
SELECT w.id, 'check_event', 'condition', true, NULL,
'{"expression": "{{trigger.type}} == ''payment_intent.succeeded''", "on_true": "update_order", "on_false": "log_event"}',
0
FROM workflows w WHERE w.name = 'Stripe Payment Handler' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, on_success, step_config, position_y)
SELECT w.id, 'update_order', 'tool', false, 'send_confirmation',
'{"tool_name": "orders", "args_template": {"action": "find", "status": "pending"}}',
100
FROM workflows w WHERE w.name = 'Stripe Payment Handler' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, step_config, position_y)
SELECT w.id, 'send_confirmation', 'tool', false, NULL,
'{"tool_name": "email", "args_template": {"action": "send", "to": "{{trigger.receipt_email}}", "subject": "Payment Confirmed", "text": "Your payment of ${{trigger.amount}} has been confirmed. Thank you!"}}',
200
FROM workflows w WHERE w.name = 'Stripe Payment Handler' AND w.is_template = true
ON CONFLICT DO NOTHING;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, step_config, position_y)
SELECT w.id, 'log_event', 'noop', false, NULL,
'{}',
100
FROM workflows w WHERE w.name = 'Stripe Payment Handler' AND w.is_template = true
ON CONFLICT DO NOTHING;
-- ============================================================================
-- Updated_at triggers for new tables
-- ============================================================================
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'trg_approval_requests_updated_at') THEN
CREATE TRIGGER trg_approval_requests_updated_at BEFORE UPDATE ON workflow_approval_requests
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
END IF;
END $$;
-- ============================================================================
-- Update tool registry with new workflow actions
-- ============================================================================
UPDATE ai_tool_registry SET
description = 'Manage automation workflows: create multi-step DAGs with branching, webhooks, conditions, approvals, versioning, and data flow between steps. Actions: list, get, create, update, delete (workflow CRUD); add_step, update_step, delete_step (step management); start, pause, resume, cancel (run control); runs, step_runs (execution history); analytics (performance metrics); create_webhook, list_webhooks, delete_webhook (webhook endpoints); list_approvals, respond_approval (human-in-the-loop); publish, versions, rollback (versioning); list_templates, clone_template (pre-built templates).',
definition = jsonb_set(
definition,
'{input_schema,properties,action,enum}',
'["list", "get", "create", "update", "delete", "add_step", "update_step", "delete_step", "start", "pause", "resume", "cancel", "runs", "step_runs", "analytics", "create_webhook", "list_webhooks", "delete_webhook", "list_approvals", "respond_approval", "publish", "versions", "rollback", "list_templates", "clone_template"]'
)
WHERE name = 'workflows';