Skip to main content
Glama
MisterSandFR

Supabase MCP Server - Self-Hosted Edition

by MisterSandFR
analyze_rls_coverage.ts12.7 kB
import { Tool } from "@modelcontextprotocol/sdk/types.js"; import { z } from "zod"; import { ToolContext } from "./types.js"; import { executeSqlWithFallback } from "./utils.js"; const AnalyzeRlsCoverageInputSchema = z.object({ includeSystemTables: z.boolean().optional().default(false).describe("Include system tables in analysis"), suggestPolicies: z.boolean().optional().default(true).describe("Suggest policies for unprotected tables"), checkOrphans: z.boolean().optional().default(true).describe("Check for orphaned policies") }); type AnalyzeRlsCoverageInput = z.infer<typeof AnalyzeRlsCoverageInputSchema>; const AnalyzeRlsCoverageOutputSchema = z.object({ content: z.array(z.object({ type: z.literal("text"), text: z.string() })) }); export const analyzeRlsCoverageTool = { name: "analyze_rls_coverage", description: "Analyze RLS coverage, detect unprotected tables, and suggest security improvements", inputSchema: AnalyzeRlsCoverageInputSchema, mcpInputSchema: { type: "object", properties: { includeSystemTables: { type: "boolean", description: "Include system tables in analysis" }, suggestPolicies: { type: "boolean", description: "Suggest policies for unprotected tables" }, checkOrphans: { type: "boolean", description: "Check for orphaned policies" } } }, outputSchema: AnalyzeRlsCoverageOutputSchema, execute: async (input: unknown, context: ToolContext) => { const validatedInput = AnalyzeRlsCoverageInputSchema.parse(input || {}); // Get all tables with RLS status const tablesSql = ` SELECT t.schemaname, t.tablename, t.rowsecurity AS rls_enabled, t.forcerowsecurity AS force_rls, obj_description(c.oid) AS table_comment, COUNT(pol.policyname) AS policy_count FROM pg_tables t JOIN pg_class c ON c.relname = t.tablename AND c.relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = t.schemaname) LEFT JOIN pg_policies pol ON pol.schemaname = t.schemaname AND pol.tablename = t.tablename WHERE t.schemaname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') ${!validatedInput.includeSystemTables ? "AND t.schemaname NOT IN ('auth', 'storage', 'supabase_functions', 'extensions', 'graphql', 'graphql_public', 'pgbouncer', 'pgsodium', 'pgsodium_masks', 'realtime', 'supabase_migrations')" : ""} GROUP BY t.schemaname, t.tablename, t.rowsecurity, t.forcerowsecurity, c.oid ORDER BY t.schemaname, t.tablename `; const tables = await executeSqlWithFallback(tablesSql, context); // Analyze table columns to suggest policies const suggestions: any[] = []; if (validatedInput.suggestPolicies) { for (const table of tables.data) { if (!table.rls_enabled || table.policy_count === 0) { // Get columns to understand table structure const columnsSql = ` SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 ORDER BY ordinal_position `; const columns = await executeSqlWithFallback( columnsSql, context, [table.schemaname, table.tablename] ); const columnNames = columns.data.map((c: any) => c.column_name); const suggestedPolicies = []; // Suggest policies based on column patterns if (columnNames.includes('user_id') || columnNames.includes('owner_id')) { suggestedPolicies.push({ type: 'user_owned', name: `${table.tablename}_user_policy`, command: 'ALL', using: `(auth.uid() = ${columnNames.includes('user_id') ? 'user_id' : 'owner_id'})`, withCheck: `(auth.uid() = ${columnNames.includes('user_id') ? 'user_id' : 'owner_id'})`, description: 'User can only access their own records' }); } if (columnNames.includes('tenant_id') || columnNames.includes('organization_id')) { suggestedPolicies.push({ type: 'tenant_isolation', name: `${table.tablename}_tenant_policy`, command: 'ALL', using: `(${columnNames.includes('tenant_id') ? 'tenant_id' : 'organization_id'} = auth.jwt() ->> '${columnNames.includes('tenant_id') ? 'tenant_id' : 'org_id'}')`, withCheck: `(${columnNames.includes('tenant_id') ? 'tenant_id' : 'organization_id'} = auth.jwt() ->> '${columnNames.includes('tenant_id') ? 'tenant_id' : 'org_id'}')`, description: 'Multi-tenant isolation' }); } if (columnNames.includes('is_public') || columnNames.includes('published')) { suggestedPolicies.push({ type: 'public_read', name: `${table.tablename}_public_read`, command: 'SELECT', using: `(${columnNames.includes('is_public') ? 'is_public' : 'published'} = true)`, description: 'Public read for published content' }); } if (columnNames.includes('team_id') || columnNames.includes('group_id')) { suggestedPolicies.push({ type: 'team_based', name: `${table.tablename}_team_policy`, command: 'ALL', using: `(${columnNames.includes('team_id') ? 'team_id' : 'group_id'} IN (SELECT ${columnNames.includes('team_id') ? 'team_id' : 'group_id'} FROM team_members WHERE user_id = auth.uid()))`, withCheck: `(${columnNames.includes('team_id') ? 'team_id' : 'group_id'} IN (SELECT ${columnNames.includes('team_id') ? 'team_id' : 'group_id'} FROM team_members WHERE user_id = auth.uid()))`, description: 'Team-based access control' }); } // Default suggestion if no patterns matched if (suggestedPolicies.length === 0) { suggestedPolicies.push({ type: 'authenticated_only', name: `${table.tablename}_authenticated`, command: 'ALL', using: `(auth.role() = 'authenticated')`, withCheck: `(auth.role() = 'authenticated')`, description: 'Authenticated users only' }); } suggestions.push({ table: `${table.schemaname}.${table.tablename}`, currentStatus: { rlsEnabled: table.rls_enabled, forceRls: table.force_rls, policyCount: table.policy_count }, columns: columnNames, suggestedPolicies }); } } } // Check for orphaned policies (policies on non-existent tables) let orphanedPolicies: any[] = []; if (validatedInput.checkOrphans) { const orphansSql = ` SELECT DISTINCT schemaname || '.' || tablename AS table_reference FROM pg_policies WHERE NOT EXISTS ( SELECT 1 FROM pg_tables t WHERE t.schemaname = pg_policies.schemaname AND t.tablename = pg_policies.tablename ) `; const orphans = await executeSqlWithFallback(orphansSql, context); orphanedPolicies = orphans.data; } // Generate security score const totalTables = tables.data.length; const tablesWithRls = tables.data.filter((t: any) => t.rls_enabled).length; const tablesWithPolicies = tables.data.filter((t: any) => t.policy_count > 0).length; const tablesWithForceRls = tables.data.filter((t: any) => t.force_rls).length; const securityScore = totalTables > 0 ? Math.round((tablesWithPolicies / totalTables) * 100) : 100; // Identify critical issues const criticalIssues = []; const unprotectedTables = tables.data.filter((t: any) => !t.rls_enabled); if (unprotectedTables.length > 0) { criticalIssues.push({ severity: 'HIGH', issue: 'Tables without RLS', affected: unprotectedTables.map((t: any) => `${t.schemaname}.${t.tablename}`), recommendation: 'Enable RLS on these tables immediately' }); } const rlsWithoutPolicies = tables.data.filter((t: any) => t.rls_enabled && t.policy_count === 0); if (rlsWithoutPolicies.length > 0) { criticalIssues.push({ severity: 'MEDIUM', issue: 'RLS enabled but no policies defined', affected: rlsWithoutPolicies.map((t: any) => `${t.schemaname}.${t.tablename}`), recommendation: 'These tables will deny all access. Add appropriate policies.' }); } const withoutForceRls = tables.data.filter((t: any) => t.rls_enabled && !t.force_rls); if (withoutForceRls.length > 0) { criticalIssues.push({ severity: 'LOW', issue: 'RLS not forced', affected: withoutForceRls.map((t: any) => `${t.schemaname}.${t.tablename}`), recommendation: 'Consider forcing RLS to ensure it applies to table owners' }); } return { content: [{ type: "text", text: JSON.stringify({ securityScore, summary: { totalTables, tablesWithRls, tablesWithPolicies, tablesWithForceRls, unprotectedTables: totalTables - tablesWithRls, orphanedPolicies: orphanedPolicies.length }, criticalIssues, tableDetails: tables.data.map((t: any) => ({ table: `${t.schemaname}.${t.tablename}`, rlsEnabled: t.rls_enabled, forceRls: t.force_rls, policyCount: t.policy_count, status: t.rls_enabled ? (t.policy_count > 0 ? '✅ Protected' : '⚠️ RLS without policies') : '❌ Unprotected' })), suggestions: validatedInput.suggestPolicies ? suggestions : undefined, orphanedPolicies: validatedInput.checkOrphans ? orphanedPolicies : undefined, recommendations: { immediate: criticalIssues.filter(i => i.severity === 'HIGH'), important: criticalIssues.filter(i => i.severity === 'MEDIUM'), suggested: criticalIssues.filter(i => i.severity === 'LOW') } }, null, 2) }] }; } };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MisterSandFR/Supabase-MCP-SelfHosted'

If you have feedback or need assistance with the MCP directory API, please join our Discord server