-- Migration: Workflow Engine Security & Performance Fixes
-- Fixes: P3 (SQL injection), P4 (step outputs race), P7 (missing indexes)
-- Date: 2026-02-13
-- ============================================================================
-- P4 FIX: Atomic step output accumulation RPC
-- Prevents race conditions when concurrent steps write to step_outputs
-- ============================================================================
CREATE OR REPLACE FUNCTION accumulate_step_output(
p_run_id UUID,
p_step_key TEXT,
p_step_output JSONB
) RETURNS void
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
UPDATE workflow_runs
SET step_outputs = COALESCE(step_outputs, '{}'::JSONB) || jsonb_build_object(p_step_key, p_step_output)
WHERE id = p_run_id;
END;
$$;
-- ============================================================================
-- P3 FIX: Replace unsafe condition triggers with safe SQL execution
-- The old version did EXECUTE on raw user SQL — full injection vector.
-- New version: only allow SELECT queries, wrap in a safe subquery,
-- and validate the expression doesn't contain dangerous SQL.
-- ============================================================================
CREATE OR REPLACE FUNCTION process_condition_triggers() RETURNS void AS $$
DECLARE
v_trigger RECORD;
v_result BOOLEAN;
v_idempotency_key TEXT;
v_fly_url TEXT := 'https://whale-agent.fly.dev';
v_fly_secret TEXT;
v_condition_sql TEXT;
BEGIN
v_fly_secret := current_setting('app.fly_internal_secret', true);
IF v_fly_secret IS NULL OR v_fly_secret = '' THEN
RAISE NOTICE 'app.fly_internal_secret not set — skipping condition triggers';
RETURN;
END IF;
FOR v_trigger IN
SELECT ut.*, w.id as wf_id, w.store_id as wf_store_id
FROM user_triggers ut
LEFT JOIN workflows w ON w.id = ut.workflow_id
WHERE ut.trigger_type = 'condition'
AND ut.is_active = true
AND (ut.workflow_id IS NOT NULL OR ut.tool_id IS NOT NULL)
AND (ut.last_checked_at IS NULL OR ut.last_checked_at < now() - ((ut.trigger_config->>'check_interval_seconds')::INTEGER || ' seconds')::interval)
LOOP
-- Update last_checked_at
UPDATE user_triggers SET last_checked_at = now() WHERE id = v_trigger.id;
v_condition_sql := v_trigger.trigger_config->>'condition_sql';
-- P3 FIX: Validate the SQL before execution
-- Block any non-SELECT statements (INSERT, UPDATE, DELETE, DROP, ALTER, TRUNCATE, etc.)
IF v_condition_sql IS NULL OR v_condition_sql = '' THEN
CONTINUE;
END IF;
-- Normalize whitespace and check for dangerous keywords at statement boundaries
IF v_condition_sql ~* '^\s*(INSERT|UPDATE|DELETE|DROP|ALTER|TRUNCATE|CREATE|GRANT|REVOKE|COPY|EXECUTE|CALL|DO|SET|RESET|VACUUM|CLUSTER|REINDEX|COMMENT|SECURITY|REASSIGN|DISCARD)' THEN
INSERT INTO audit_logs (action, severity, store_id, resource_type, resource_id, error_message, source)
VALUES ('trigger.condition.blocked', 'error', v_trigger.store_id, 'user_trigger', v_trigger.id::TEXT,
'Blocked non-SELECT condition SQL: ' || left(v_condition_sql, 100), 'pg_cron');
CONTINUE;
END IF;
-- Block common SQL injection patterns
IF v_condition_sql ~* '(;\s*(INSERT|UPDATE|DELETE|DROP|ALTER|CREATE))|(--)|(\/\*)|(\bunion\b.*\bselect\b)' THEN
INSERT INTO audit_logs (action, severity, store_id, resource_type, resource_id, error_message, source)
VALUES ('trigger.condition.blocked', 'error', v_trigger.store_id, 'user_trigger', v_trigger.id::TEXT,
'Blocked suspicious condition SQL: ' || left(v_condition_sql, 100), 'pg_cron');
CONTINUE;
END IF;
-- P3 FIX: Execute inside a read-only transaction block
BEGIN
SET LOCAL transaction_read_only = on;
EXECUTE 'SELECT (' || v_condition_sql || ')::BOOLEAN' INTO v_result;
EXCEPTION WHEN OTHERS THEN
INSERT INTO audit_logs (action, severity, store_id, resource_type, resource_id, error_message, source)
VALUES ('trigger.condition.error', 'error', v_trigger.store_id, 'user_trigger', v_trigger.id::TEXT,
'Condition SQL failed: ' || SQLERRM, 'pg_cron');
CONTINUE;
END;
IF NOT COALESCE(v_result, false) THEN
CONTINUE;
END IF;
-- Condition is true — fire
v_idempotency_key := v_trigger.id::TEXT || '_' || to_char(now(), 'YYYY-MM-DD-HH24');
IF v_trigger.workflow_id IS NOT NULL THEN
PERFORM net.http_post(
url := v_fly_url || '/workflows/start',
headers := jsonb_build_object(
'Content-Type', 'application/json',
'Authorization', 'Bearer ' || v_fly_secret
),
body := jsonb_build_object(
'workflow_id', v_trigger.workflow_id,
'store_id', v_trigger.wf_store_id,
'trigger_type', 'condition',
'trigger_payload', jsonb_build_object(
'trigger_id', v_trigger.id,
'condition_sql', v_condition_sql,
'result', v_result
),
'idempotency_key', v_idempotency_key
)
);
END IF;
-- Audit
INSERT INTO audit_logs (action, severity, store_id, resource_type, resource_id, source, details)
VALUES ('trigger.condition.fired', 'info', v_trigger.store_id, 'user_trigger', v_trigger.id::TEXT,
'pg_cron', jsonb_build_object('condition_sql', left(v_condition_sql, 200)));
END LOOP;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- ============================================================================
-- P7 FIX: Missing production indexes
-- These indexes are critical for workflow engine performance
-- ============================================================================
-- Step runs by run_id + status (used by worker loop, completion check, inline chain)
CREATE INDEX IF NOT EXISTS idx_workflow_step_runs_run_status
ON workflow_step_runs(run_id, status);
-- Step runs by status (used by worker's claim_pending_steps)
CREATE INDEX IF NOT EXISTS idx_workflow_step_runs_status
ON workflow_step_runs(status)
WHERE status IN ('pending', 'running', 'retrying', 'waiting');
-- Step runs by parent (used by processWaitingSteps for parallel/for_each)
CREATE INDEX IF NOT EXISTS idx_workflow_step_runs_parent
ON workflow_step_runs(parent_step_run_id)
WHERE parent_step_run_id IS NOT NULL;
-- Runs by workflow_id + status (used by list, analytics, concurrent limit check)
CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_status
ON workflow_runs(workflow_id, status);
-- Runs by store_id + created_at (used by runs listing)
CREATE INDEX IF NOT EXISTS idx_workflow_runs_store_created
ON workflow_runs(store_id, created_at DESC);
-- Workflows by store_id + is_active (used by list, trigger processing)
CREATE INDEX IF NOT EXISTS idx_workflows_store_active
ON workflows(store_id, is_active)
WHERE is_active = true;
-- Webhook endpoints by slug + is_active (used by webhook ingestion)
CREATE INDEX IF NOT EXISTS idx_webhook_endpoints_slug_active
ON webhook_endpoints(slug, is_active)
WHERE is_active = true;
-- Approval requests by status + store_id (used by list_approvals)
CREATE INDEX IF NOT EXISTS idx_approvals_status_store
ON workflow_approval_requests(store_id, status)
WHERE status = 'pending';
-- Workflow versions by workflow_id (used by versioning lookups)
CREATE INDEX IF NOT EXISTS idx_workflow_versions_workflow
ON workflow_versions(workflow_id, version DESC);
-- Step runs next_retry_at for retrying steps (used by worker loop)
CREATE INDEX IF NOT EXISTS idx_workflow_step_runs_retry
ON workflow_step_runs(next_retry_at)
WHERE status = 'retrying' AND next_retry_at IS NOT NULL;