-- Migration: Workflow Engine v6.0 — Unified Event Bus
-- Bridges CRM automation rules with the workflow DAG engine.
-- Creates event queue, event→workflow subscriptions, DB triggers for automatic
-- event firing, and a migration RPC for existing marketing_automation_rules.
-- Date: 2026-02-14
-- ============================================================================
-- 1. AUTOMATION EVENTS — inbound event queue
-- ============================================================================
CREATE TABLE IF NOT EXISTS automation_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
store_id UUID NOT NULL,
event_type TEXT NOT NULL,
event_payload JSONB NOT NULL DEFAULT '{}'::JSONB,
source TEXT DEFAULT 'system', -- 'db_trigger', 'api', 'crm', 'manual'
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'processing', 'processed', 'failed')),
processed_at TIMESTAMPTZ,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
ALTER TABLE automation_events ENABLE ROW LEVEL SECURITY;
DO $$ BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_policies WHERE tablename = 'automation_events' AND policyname = 'events_service_role'
) THEN
CREATE POLICY events_service_role ON automation_events FOR ALL
USING (true) WITH CHECK (true);
END IF;
END $$;
-- Fast pickup of pending events per store
CREATE INDEX IF NOT EXISTS idx_automation_events_pending
ON automation_events(store_id, event_type)
WHERE status = 'pending';
-- Cleanup index for old events
CREATE INDEX IF NOT EXISTS idx_automation_events_created
ON automation_events(created_at);
-- Notify worker immediately on new event
CREATE OR REPLACE FUNCTION notify_automation_event()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('automation_event', json_build_object(
'id', NEW.id,
'store_id', NEW.store_id,
'event_type', NEW.event_type
)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trg_notify_automation_event ON automation_events;
CREATE TRIGGER trg_notify_automation_event
AFTER INSERT ON automation_events
FOR EACH ROW EXECUTE FUNCTION notify_automation_event();
-- ============================================================================
-- 2. WORKFLOW EVENT SUBSCRIPTIONS — event→workflow mapping
-- ============================================================================
CREATE TABLE IF NOT EXISTS workflow_event_subscriptions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
store_id UUID NOT NULL,
workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
event_type TEXT NOT NULL,
filter_expression TEXT, -- optional JS-like condition on payload
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(workflow_id, event_type)
);
ALTER TABLE workflow_event_subscriptions ENABLE ROW LEVEL SECURITY;
DO $$ BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_policies WHERE tablename = 'workflow_event_subscriptions' AND policyname = 'subs_service_role'
) THEN
CREATE POLICY subs_service_role ON workflow_event_subscriptions FOR ALL
USING (true) WITH CHECK (true);
END IF;
END $$;
CREATE INDEX IF NOT EXISTS idx_event_subs_lookup
ON workflow_event_subscriptions(store_id, event_type)
WHERE is_active = true;
-- ============================================================================
-- 3. fire_event RPC — programmatic event insertion
-- ============================================================================
CREATE OR REPLACE FUNCTION fire_event(
p_store_id UUID,
p_event_type TEXT,
p_event_payload JSONB DEFAULT '{}'::JSONB,
p_source TEXT DEFAULT 'api'
)
RETURNS UUID AS $$
DECLARE
v_id UUID;
BEGIN
INSERT INTO automation_events (store_id, event_type, event_payload, source)
VALUES (p_store_id, p_event_type, p_event_payload, p_source)
RETURNING id INTO v_id;
RETURN v_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- 4. DB TRIGGERS — automatic event firing from table changes
-- ============================================================================
-- 4a. Order completed → fire 'order_completed'
CREATE OR REPLACE FUNCTION fire_order_completed_event()
RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'UPDATE' AND NEW.status = 'completed' AND OLD.status IS DISTINCT FROM 'completed') THEN
INSERT INTO automation_events (store_id, event_type, event_payload, source)
VALUES (
NEW.store_id,
'order_completed',
jsonb_build_object(
'order_id', NEW.id,
'customer_id', NEW.customer_id,
'total', NEW.total,
'status', NEW.status
),
'db_trigger'
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
DROP TRIGGER IF EXISTS trg_order_completed_event ON orders;
CREATE TRIGGER trg_order_completed_event
AFTER UPDATE ON orders
FOR EACH ROW
WHEN (NEW.status = 'completed' AND OLD.status IS DISTINCT FROM 'completed')
EXECUTE FUNCTION fire_order_completed_event();
-- 4b. Customer created → fire 'customer_created'
CREATE OR REPLACE FUNCTION fire_customer_created_event()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO automation_events (store_id, event_type, event_payload, source)
VALUES (
NEW.store_id,
'customer_created',
jsonb_build_object(
'customer_id', NEW.id,
'email', COALESCE(NEW.email, ''),
'first_name', COALESCE(NEW.first_name, ''),
'last_name', COALESCE(NEW.last_name, '')
),
'db_trigger'
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
DROP TRIGGER IF EXISTS trg_customer_created_event ON store_customer_profiles;
CREATE TRIGGER trg_customer_created_event
AFTER INSERT ON store_customer_profiles
FOR EACH ROW EXECUTE FUNCTION fire_customer_created_event();
-- 4c. Loyalty tier change → fire 'loyalty_tier_change'
CREATE OR REPLACE FUNCTION fire_loyalty_tier_change_event()
RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'UPDATE' AND NEW.loyalty_tier IS DISTINCT FROM OLD.loyalty_tier) THEN
INSERT INTO automation_events (store_id, event_type, event_payload, source)
VALUES (
NEW.store_id,
'loyalty_tier_change',
jsonb_build_object(
'customer_id', NEW.id,
'email', COALESCE(NEW.email, ''),
'from_tier', COALESCE(OLD.loyalty_tier, 'none'),
'to_tier', COALESCE(NEW.loyalty_tier, 'none'),
'first_name', COALESCE(NEW.first_name, ''),
'last_name', COALESCE(NEW.last_name, '')
),
'db_trigger'
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
DROP TRIGGER IF EXISTS trg_loyalty_tier_change_event ON store_customer_profiles;
CREATE TRIGGER trg_loyalty_tier_change_event
AFTER UPDATE ON store_customer_profiles
FOR EACH ROW
WHEN (NEW.loyalty_tier IS DISTINCT FROM OLD.loyalty_tier)
EXECUTE FUNCTION fire_loyalty_tier_change_event();
-- ============================================================================
-- 5. MIGRATION RPC — convert marketing_automation_rules → workflows
-- ============================================================================
-- Add column to track migration (idempotent)
ALTER TABLE marketing_automation_rules ADD COLUMN IF NOT EXISTS migrated_workflow_id UUID;
CREATE OR REPLACE FUNCTION migrate_automation_rules(p_store_id UUID)
RETURNS TABLE(rule_id UUID, workflow_id UUID, status TEXT) AS $$
DECLARE
r RECORD;
v_wf_id UUID;
v_step_id UUID;
v_step_config JSONB;
v_step_type TEXT;
v_event_type TEXT;
v_trigger_config JSONB;
BEGIN
FOR r IN
SELECT * FROM marketing_automation_rules
WHERE store_id = p_store_id AND migrated_workflow_id IS NULL
LOOP
BEGIN
-- Map trigger_type to event_type (1:1 for most)
v_event_type := r.trigger_type;
-- Build trigger config from rule
v_trigger_config := jsonb_build_object('event_type', v_event_type);
IF r.trigger_config IS NOT NULL THEN
v_trigger_config := v_trigger_config || r.trigger_config::JSONB;
END IF;
-- Create workflow
INSERT INTO workflows (
store_id, name, description, trigger_type, trigger_config,
is_active, status, is_template
) VALUES (
p_store_id,
COALESCE(r.name, 'Migrated Rule'),
COALESCE(r.description, 'Migrated from automation rules'),
CASE WHEN r.trigger_type = 'birthday' THEN 'schedule' ELSE 'event' END,
v_trigger_config,
COALESCE(r.is_active, false),
CASE WHEN COALESCE(r.is_active, false) THEN 'active' ELSE 'draft' END,
false
) RETURNING id INTO v_wf_id;
-- Map action_type to workflow step
v_step_type := 'tool';
v_step_config := CASE r.action_type
WHEN 'send_email' THEN jsonb_build_object(
'tool_name', 'email',
'args_template', jsonb_build_object(
'action', 'send_template',
'to', '{{trigger.email}}',
'template', COALESCE((r.action_config::JSONB)->>'template_id', ''),
'subject', COALESCE((r.action_config::JSONB)->>'subject', '')
)
)
WHEN 'send_sms' THEN jsonb_build_object(
'tool_name', 'email',
'args_template', jsonb_build_object(
'action', 'send',
'to', '{{trigger.email}}',
'text', COALESCE((r.action_config::JSONB)->>'message', '')
)
)
WHEN 'award_points' THEN jsonb_build_object(
'tool_name', 'customers',
'args_template', jsonb_build_object(
'action', 'update',
'customer_id', '{{trigger.customer_id}}',
'loyalty_points', COALESCE(((r.action_config::JSONB)->>'points')::INT, 0)
)
)
WHEN 'add_to_segment' THEN jsonb_build_object(
'tool_name', 'customers',
'args_template', jsonb_build_object(
'action', 'add_note',
'customer_id', '{{trigger.customer_id}}',
'note', 'segment:' || COALESCE((r.action_config::JSONB)->>'segment_name', '')
)
)
WHEN 'tag' THEN jsonb_build_object(
'tool_name', 'customers',
'args_template', jsonb_build_object(
'action', 'add_note',
'customer_id', '{{trigger.customer_id}}',
'note', 'tag:' || COALESCE((r.action_config::JSONB)->>'tag_name', '')
)
)
WHEN 'send_wallet_notification' THEN jsonb_build_object(
'tool_name', 'email',
'args_template', jsonb_build_object(
'action', 'send',
'to', '{{trigger.email}}',
'subject', COALESCE((r.action_config::JSONB)->>'title', 'Notification'),
'text', COALESCE((r.action_config::JSONB)->>'body', '')
)
)
ELSE jsonb_build_object('tool_name', 'noop')
END;
-- Create the workflow step
INSERT INTO workflow_steps (
workflow_id, step_key, step_type, is_entry_point, step_config
) VALUES (
v_wf_id, 'action_1', v_step_type, true, v_step_config
);
-- Create event subscription (skip for birthday — that's schedule-based)
IF r.trigger_type != 'birthday' THEN
INSERT INTO workflow_event_subscriptions (
store_id, workflow_id, event_type, is_active
) VALUES (
p_store_id, v_wf_id, v_event_type, COALESCE(r.is_active, false)
);
ELSE
-- Birthday: set up as daily cron at 9 AM
UPDATE workflows SET
cron_expression = '0 9 * * *',
next_run_at = (CURRENT_DATE + INTERVAL '1 day' + INTERVAL '9 hours')
WHERE id = v_wf_id;
END IF;
-- Mark rule as migrated
UPDATE marketing_automation_rules
SET migrated_workflow_id = v_wf_id
WHERE id = r.id;
rule_id := r.id;
workflow_id := v_wf_id;
status := 'migrated';
RETURN NEXT;
EXCEPTION WHEN OTHERS THEN
rule_id := r.id;
workflow_id := NULL;
status := 'error: ' || SQLERRM;
RETURN NEXT;
END;
END LOOP;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- 6. CRM AUTOMATION TEMPLATES
-- Pre-built workflow templates for common CRM automation patterns
-- ============================================================================
-- Template: Welcome Email
DO $$ DECLARE v_wf_id UUID; BEGIN
INSERT INTO workflows (name, description, trigger_type, trigger_config, is_active, status, is_template, template_category, template_tags, icon)
VALUES ('Welcome Email', 'Send a welcome email 5 minutes after a new customer signs up', 'event', '{"event_type":"customer_created"}'::JSONB, false, 'active', true, 'crm', ARRAY['email','onboarding','customer'], 'envelope.badge.person.crop')
RETURNING id INTO v_wf_id;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config)
VALUES (v_wf_id, 'delay_5m', 'delay', true, 'send_welcome', '{"seconds":300}'::JSONB);
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, step_config)
VALUES (v_wf_id, 'send_welcome', 'tool', false, '{"tool_name":"email","args_template":{"action":"send","to":"{{trigger.email}}","subject":"Welcome to our store!","html":"<h2>Welcome, {{trigger.first_name}}!</h2><p>Thanks for joining. We''re excited to have you.</p>"}}'::JSONB);
END $$;
-- Template: Purchase Thank You
DO $$ DECLARE v_wf_id UUID; BEGIN
INSERT INTO workflows (name, description, trigger_type, trigger_config, is_active, status, is_template, template_category, template_tags, icon)
VALUES ('Purchase Thank You', 'Send a thank-you email after an order is completed', 'event', '{"event_type":"order_completed"}'::JSONB, false, 'active', true, 'crm', ARRAY['email','orders','retention'], 'bag.badge.checkmark')
RETURNING id INTO v_wf_id;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, step_config)
VALUES (v_wf_id, 'send_thanks', 'tool', true, '{"tool_name":"email","args_template":{"action":"send","to":"{{trigger.customer_email}}","subject":"Thanks for your order!","html":"<h2>Order Confirmed</h2><p>Thank you for your purchase. Your order #{{trigger.order_id}} is on its way.</p>"}}'::JSONB);
END $$;
-- Template: Loyalty Upgrade
DO $$ DECLARE v_wf_id UUID; BEGIN
INSERT INTO workflows (name, description, trigger_type, trigger_config, is_active, status, is_template, template_category, template_tags, icon)
VALUES ('Loyalty Upgrade Notification', 'Congratulate customers when their loyalty tier changes', 'event', '{"event_type":"loyalty_tier_change"}'::JSONB, false, 'active', true, 'crm', ARRAY['loyalty','email','retention'], 'star.circle')
RETURNING id INTO v_wf_id;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, on_failure, step_config)
VALUES (v_wf_id, 'check_upgrade', 'condition', true, 'send_congrats', null,
'{"expression":"trigger.to_tier !== ''none'' && trigger.to_tier !== trigger.from_tier","on_true":"send_congrats","on_false":null}'::JSONB);
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, step_config)
VALUES (v_wf_id, 'send_congrats', 'tool', false,
'{"tool_name":"email","args_template":{"action":"send","to":"{{trigger.email}}","subject":"Congratulations on reaching {{trigger.to_tier}} tier!","html":"<h2>You''ve been upgraded!</h2><p>Hi {{trigger.first_name}}, you''re now a {{trigger.to_tier}} member. Enjoy your new benefits!</p>"}}'::JSONB);
END $$;
-- Template: Birthday Discount
DO $$ DECLARE v_wf_id UUID; BEGIN
INSERT INTO workflows (name, description, trigger_type, trigger_config, is_active, status, is_template, template_category, template_tags, icon, cron_expression, next_run_at)
VALUES ('Birthday Discount', 'Daily check for customer birthdays, send a discount email', 'schedule', '{"schedule":"daily_9am"}'::JSONB, false, 'active', true, 'crm', ARRAY['birthday','email','loyalty'], 'gift', '0 9 * * *', (CURRENT_DATE + INTERVAL '1 day' + INTERVAL '9 hours'))
RETURNING id INTO v_wf_id;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config)
VALUES (v_wf_id, 'find_birthdays', 'tool', true, 'send_each',
'{"tool_name":"customers","args_template":{"action":"find","query":"birthday_today","limit":100}}'::JSONB);
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config)
VALUES (v_wf_id, 'send_each', 'for_each', false, null,
'{"items_expression":"steps.find_birthdays.output.data","step_keys":["send_birthday_email"]}'::JSONB);
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, step_config)
VALUES (v_wf_id, 'send_birthday_email', 'tool', false,
'{"tool_name":"email","args_template":{"action":"send","to":"{{item.email}}","subject":"Happy Birthday, {{item.first_name}}! 🎂","html":"<h2>Happy Birthday!</h2><p>Enjoy a special discount on your next purchase.</p>"}}'::JSONB);
END $$;
-- Template: Win-Back Campaign
DO $$ DECLARE v_wf_id UUID; BEGIN
INSERT INTO workflows (name, description, trigger_type, trigger_config, is_active, status, is_template, template_category, template_tags, icon)
VALUES ('Win-Back Campaign', 'Re-engage customers 30 days after their last purchase', 'event', '{"event_type":"order_completed"}'::JSONB, false, 'active', true, 'crm', ARRAY['email','retention','winback'], 'arrow.uturn.backward.circle')
RETURNING id INTO v_wf_id;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config)
VALUES (v_wf_id, 'wait_30d', 'delay', true, 'send_winback',
'{"seconds":2592000}'::JSONB);
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, step_config)
VALUES (v_wf_id, 'send_winback', 'tool', false,
'{"tool_name":"email","args_template":{"action":"send","to":"{{trigger.customer_email}}","subject":"We miss you!","html":"<h2>It''s been a while!</h2><p>Come back and check out what''s new. Here''s a special offer just for you.</p>"}}'::JSONB);
END $$;
-- Template: Drip Campaign (3-email onboarding)
DO $$ DECLARE v_wf_id UUID; BEGIN
INSERT INTO workflows (name, description, trigger_type, trigger_config, is_active, status, is_template, template_category, template_tags, icon)
VALUES ('Drip Campaign', '3-email onboarding sequence for new customers: welcome, tips, offer', 'event', '{"event_type":"customer_created"}'::JSONB, false, 'active', true, 'crm', ARRAY['email','onboarding','drip','sequence'], 'drop.triangle')
RETURNING id INTO v_wf_id;
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config)
VALUES (v_wf_id, 'email_1', 'tool', true, 'wait_3d',
'{"tool_name":"email","args_template":{"action":"send","to":"{{trigger.email}}","subject":"Welcome aboard!","html":"<h2>Welcome!</h2><p>Thanks for signing up, {{trigger.first_name}}. Here''s what you can expect.</p>"}}'::JSONB);
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config)
VALUES (v_wf_id, 'wait_3d', 'delay', false, 'email_2', '{"seconds":259200}'::JSONB);
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config)
VALUES (v_wf_id, 'email_2', 'tool', false, 'wait_7d',
'{"tool_name":"email","args_template":{"action":"send","to":"{{trigger.email}}","subject":"Tips to get the most out of your account","html":"<h2>Pro Tips</h2><p>Here are some features you might have missed.</p>"}}'::JSONB);
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, on_success, step_config)
VALUES (v_wf_id, 'wait_7d', 'delay', false, 'email_3', '{"seconds":604800}'::JSONB);
INSERT INTO workflow_steps (workflow_id, step_key, step_type, is_entry_point, step_config)
VALUES (v_wf_id, 'email_3', 'tool', false,
'{"tool_name":"email","args_template":{"action":"send","to":"{{trigger.email}}","subject":"A special offer for you","html":"<h2>Exclusive Deal</h2><p>As a thank you for being a member, here''s 15% off your first order.</p>"}}'::JSONB);
END $$;
-- ============================================================================
-- 7. CLEANUP — auto-delete processed events older than 7 days
-- ============================================================================
CREATE OR REPLACE FUNCTION cleanup_old_automation_events()
RETURNS INTEGER AS $$
DECLARE
v_count INTEGER;
BEGIN
DELETE FROM automation_events
WHERE status IN ('processed', 'failed')
AND created_at < now() - INTERVAL '7 days';
GET DIAGNOSTICS v_count = ROW_COUNT;
RETURN v_count;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;