-- ============================================================================
-- Workflow Engine — Phase 2: Webhooks, Schedule, Condition Triggers
-- ============================================================================
-- Enable pg_net for outbound HTTP from Postgres
CREATE EXTENSION IF NOT EXISTS pg_net WITH SCHEMA extensions;
-- Add workflow_id to user_triggers for multi-step workflow support
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'user_triggers' AND column_name = 'workflow_id') THEN
ALTER TABLE user_triggers ADD COLUMN workflow_id UUID REFERENCES workflows(id) ON DELETE SET NULL;
COMMENT ON COLUMN user_triggers.workflow_id IS 'If set, trigger starts a workflow run instead of executing a single tool';
END IF;
END $$;
-- Store Fly.io internal secret for pg_net auth
-- Set via: ALTER DATABASE postgres SET app.fly_internal_secret = '<secret>';
-- Also set as FLY_INTERNAL_SECRET env var on Fly.io
-- ============================================================================
-- Schedule trigger processor — called by pg_cron every 60 seconds
-- ============================================================================
CREATE OR REPLACE FUNCTION process_schedule_triggers() RETURNS void AS $$
DECLARE
v_trigger RECORD;
v_minute INTEGER;
v_hour INTEGER;
v_idempotency_key TEXT;
v_fly_url TEXT := 'https://whale-agent.fly.dev';
v_fly_secret TEXT;
BEGIN
v_fly_secret := current_setting('app.fly_internal_secret', true);
IF v_fly_secret IS NULL OR v_fly_secret = '' THEN
RAISE NOTICE 'app.fly_internal_secret not set — skipping schedule triggers';
RETURN;
END IF;
v_minute := EXTRACT(MINUTE FROM now())::INTEGER;
v_hour := EXTRACT(HOUR FROM now())::INTEGER;
FOR v_trigger IN
SELECT ut.*, w.id as wf_id, w.store_id as wf_store_id
FROM user_triggers ut
LEFT JOIN workflows w ON w.id = ut.workflow_id
WHERE ut.trigger_type = 'schedule'
AND ut.is_active = true
AND (ut.workflow_id IS NOT NULL OR ut.tool_id IS NOT NULL)
LOOP
-- Simple cron matching (minute/hour only for MVP)
-- trigger_config: { "cron": "*/5 * * * *" } or { "interval_minutes": 5 }
IF v_trigger.trigger_config ? 'interval_minutes' THEN
IF v_minute % (v_trigger.trigger_config->>'interval_minutes')::INTEGER != 0 THEN
CONTINUE;
END IF;
END IF;
-- Idempotency: prevent double-fire within same minute
v_idempotency_key := v_trigger.id::TEXT || '_' || to_char(now(), 'YYYY-MM-DD-HH24-MI');
IF v_trigger.workflow_id IS NOT NULL THEN
-- Fire workflow via pg_net
PERFORM net.http_post(
url := v_fly_url || '/workflows/start',
headers := jsonb_build_object(
'Content-Type', 'application/json',
'Authorization', 'Bearer ' || v_fly_secret
),
body := jsonb_build_object(
'workflow_id', v_trigger.workflow_id,
'store_id', v_trigger.wf_store_id,
'trigger_type', 'schedule',
'trigger_payload', jsonb_build_object('trigger_id', v_trigger.id, 'fired_at', now()),
'idempotency_key', v_idempotency_key
)
);
ELSE
-- Legacy single-tool trigger — enqueue
INSERT INTO trigger_queue (trigger_id, store_id, payload, idempotency_key)
VALUES (v_trigger.id, v_trigger.store_id, jsonb_build_object('fired_at', now()), v_idempotency_key)
ON CONFLICT DO NOTHING;
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- Condition trigger processor — called by pg_cron every 60 seconds
-- ============================================================================
CREATE OR REPLACE FUNCTION process_condition_triggers() RETURNS void AS $$
DECLARE
v_trigger RECORD;
v_result BOOLEAN;
v_idempotency_key TEXT;
v_fly_url TEXT := 'https://whale-agent.fly.dev';
v_fly_secret TEXT;
BEGIN
v_fly_secret := current_setting('app.fly_internal_secret', true);
IF v_fly_secret IS NULL OR v_fly_secret = '' THEN
RAISE NOTICE 'app.fly_internal_secret not set — skipping condition triggers';
RETURN;
END IF;
FOR v_trigger IN
SELECT ut.*, w.id as wf_id, w.store_id as wf_store_id
FROM user_triggers ut
LEFT JOIN workflows w ON w.id = ut.workflow_id
WHERE ut.trigger_type = 'condition'
AND ut.is_active = true
AND (ut.workflow_id IS NOT NULL OR ut.tool_id IS NOT NULL)
-- Respect check interval
AND (ut.last_checked_at IS NULL OR ut.last_checked_at < now() - ((ut.trigger_config->>'check_interval_seconds')::INTEGER || ' seconds')::interval)
LOOP
-- Update last_checked_at
UPDATE user_triggers SET last_checked_at = now() WHERE id = v_trigger.id;
-- Execute condition SQL safely
BEGIN
EXECUTE v_trigger.trigger_config->>'condition_sql' INTO v_result;
EXCEPTION WHEN OTHERS THEN
-- Log error but continue
INSERT INTO audit_logs (action, severity, store_id, resource_type, resource_id, error_message, source)
VALUES ('trigger.condition.error', 'error', v_trigger.store_id, 'user_trigger', v_trigger.id::TEXT,
'Condition SQL failed: ' || SQLERRM, 'pg_cron');
CONTINUE;
END;
IF NOT COALESCE(v_result, false) THEN
CONTINUE;
END IF;
-- Condition is true — fire
v_idempotency_key := v_trigger.id::TEXT || '_' || to_char(now(), 'YYYY-MM-DD-HH24');
IF v_trigger.workflow_id IS NOT NULL THEN
PERFORM net.http_post(
url := v_fly_url || '/workflows/start',
headers := jsonb_build_object(
'Content-Type', 'application/json',
'Authorization', 'Bearer ' || v_fly_secret
),
body := jsonb_build_object(
'workflow_id', v_trigger.workflow_id,
'store_id', v_trigger.wf_store_id,
'trigger_type', 'condition',
'trigger_payload', jsonb_build_object('trigger_id', v_trigger.id, 'condition_met_at', now()),
'idempotency_key', v_idempotency_key
)
);
ELSE
INSERT INTO trigger_queue (trigger_id, store_id, payload, idempotency_key)
VALUES (v_trigger.id, v_trigger.store_id, jsonb_build_object('condition_met_at', now()), v_idempotency_key)
ON CONFLICT DO NOTHING;
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- pg_cron jobs (requires pg_cron extension — available on Supabase Pro+)
-- ============================================================================
-- Worker loop: process pending workflow steps every 10 seconds
-- This pokes the Fly.io server to claim and execute steps
DO $$ BEGIN
IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN
PERFORM cron.schedule('workflow_worker', '10 seconds',
$$SELECT net.http_post(
url := 'https://whale-agent.fly.dev/workflows/process',
headers := jsonb_build_object(
'Content-Type', 'application/json',
'Authorization', 'Bearer ' || current_setting('app.fly_internal_secret', true)
),
body := '{"batch_size": 10}'::jsonb
)$$
);
-- Schedule triggers every minute
PERFORM cron.schedule('schedule_triggers', '* * * * *',
$$SELECT process_schedule_triggers()$$
);
-- Condition triggers every minute
PERFORM cron.schedule('condition_triggers', '* * * * *',
$$SELECT process_condition_triggers()$$
);
END IF;
END $$;
-- ============================================================================
-- Phase 3: Circuit Breaker on user_tools, enhanced audit, observability
-- ============================================================================
-- Add circuit breaker columns to user_tools
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'user_tools' AND column_name = 'circuit_breaker_state') THEN
ALTER TABLE user_tools
ADD COLUMN circuit_breaker_failures INTEGER NOT NULL DEFAULT 0,
ADD COLUMN circuit_breaker_state TEXT NOT NULL DEFAULT 'closed' CHECK (circuit_breaker_state IN ('closed', 'open', 'half_open')),
ADD COLUMN circuit_breaker_tripped_at TIMESTAMPTZ,
ADD COLUMN circuit_breaker_threshold INTEGER NOT NULL DEFAULT 5,
ADD COLUMN circuit_breaker_cooldown_seconds INTEGER NOT NULL DEFAULT 300;
END IF;
END $$;
-- Add last_checked_at to user_triggers for condition check interval
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'user_triggers' AND column_name = 'last_checked_at') THEN
ALTER TABLE user_triggers ADD COLUMN last_checked_at TIMESTAMPTZ;
END IF;
END $$;