Skip to main content
Glama
MisterSandFR

Supabase MCP Server - Self-Hosted Edition

by MisterSandFR
realtime_management.ts22.8 kB
import { Tool } from "@modelcontextprotocol/sdk/types.js"; import { z } from "zod"; import { ToolContext } from "./types.js"; import { executeSqlWithFallback } from "./utils.js"; const RealtimeManagementInputSchema = z.object({ action: z.enum(['enable_realtime', 'disable_realtime', 'list_publications', 'create_channel', 'manage_broadcast', 'configure_presence', 'analytics', 'test_connection', 'security_audit']).describe("Realtime management action"), tableName: z.string().optional().describe("Table name for realtime"), schemaName: z.string().optional().default('public').describe("Schema name"), channelName: z.string().optional().describe("Channel name"), eventTypes: z.array(z.enum(['INSERT', 'UPDATE', 'DELETE', '*'])).optional().default(['*']).describe("Event types to broadcast"), filters: z.array(z.object({ column: z.string(), operator: z.enum(['eq', 'neq', 'gt', 'gte', 'lt', 'lte', 'in']), value: z.string() })).optional().describe("Realtime filters"), broadcastConfig: z.object({ self: z.boolean().default(false), ack: z.boolean().default(false) }).optional().describe("Broadcast configuration"), presenceConfig: z.object({ key: z.string(), payload: z.record(z.any()).optional() }).optional().describe("Presence configuration"), rateLimiting: z.object({ maxEventsPerSecond: z.number().default(100), maxConnectionsPerChannel: z.number().default(1000) }).optional().describe("Rate limiting settings"), securityRules: z.array(z.object({ rule: z.string(), condition: z.string() })).optional().describe("Security rules for realtime"), environment: z.enum(['development', 'staging', 'production']).optional().describe("Environment"), monitoring: z.boolean().optional().default(true).describe("Enable monitoring") }); type RealtimeManagementInput = z.infer<typeof RealtimeManagementInputSchema>; const realtimeManagementOutputSchema = z.object({ content: z.array(z.object({ type: z.literal("text"), text: z.string() })) }); export const realtimeManagementTool = { name: "realtime_management", description: "Comprehensive Supabase Realtime management with channels, broadcasts, presence, and security controls", inputSchema: RealtimeManagementInputSchema, mcpInputSchema: { type: "object", properties: { action: { type: "string", enum: ['enable_realtime', 'disable_realtime', 'list_publications', 'create_channel', 'manage_broadcast', 'configure_presence', 'analytics', 'test_connection', 'security_audit'], description: "Realtime management action" }, tableName: { type: "string", description: "Table name for realtime" }, schemaName: { type: "string", description: "Schema name" }, channelName: { type: "string", description: "Channel name" }, eventTypes: { type: "array", items: { type: "string", enum: ["INSERT", "UPDATE", "DELETE", "*"] }, description: "Event types" }, filters: { type: "array", items: { type: "object", properties: { column: { type: "string" }, operator: { type: "string", enum: ["eq", "neq", "gt", "gte", "lt", "lte", "in"] }, value: { type: "string" } } } }, broadcastConfig: { type: "object", properties: { self: { type: "boolean" }, ack: { type: "boolean" } } }, presenceConfig: { type: "object", properties: { key: { type: "string" }, payload: { type: "object" } } }, rateLimiting: { type: "object", properties: { maxEventsPerSecond: { type: "number" }, maxConnectionsPerChannel: { type: "number" } } }, securityRules: { type: "array", items: { type: "object", properties: { rule: { type: "string" }, condition: { type: "string" } } } }, environment: { type: "string", enum: ["development", "staging", "production"] }, monitoring: { type: "boolean" } }, required: ["action"] }, outputSchema: realtimeManagementOutputSchema, execute: async (input: unknown, context: ToolContext) => { const validatedInput = RealtimeManagementInputSchema.parse(input); await ensureRealtimeInfrastructure(context); switch (validatedInput.action) { case 'enable_realtime': { if (!validatedInput.tableName) { throw new Error("Table name is required"); } const result = await enableRealtimeForTable(validatedInput, context); return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] }; } case 'list_publications': { const result = await listRealtimePublications(validatedInput, context); return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] }; } case 'analytics': { const result = await getRealtimeAnalytics(validatedInput, context); return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] }; } case 'security_audit': { const result = await auditRealtimeSecurity(validatedInput, context); return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] }; } default: throw new Error(`Unknown action: ${validatedInput.action}`); } } }; async function ensureRealtimeInfrastructure(context: ToolContext): Promise<void> { const sql = ` CREATE TABLE IF NOT EXISTS realtime_config ( id SERIAL PRIMARY KEY, table_name VARCHAR(255) NOT NULL, schema_name VARCHAR(255) DEFAULT 'public', publication_name VARCHAR(255), enabled BOOLEAN DEFAULT true, event_types TEXT[] DEFAULT ARRAY['*'], filters JSONB DEFAULT '[]', rate_limiting JSONB DEFAULT '{}', security_rules JSONB DEFAULT '[]', created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), UNIQUE(schema_name, table_name) ); CREATE TABLE IF NOT EXISTS realtime_channels ( id SERIAL PRIMARY KEY, channel_name VARCHAR(255) UNIQUE NOT NULL, config JSONB DEFAULT '{}', active_connections INTEGER DEFAULT 0, total_messages BIGINT DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), last_activity TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS realtime_metrics ( id SERIAL PRIMARY KEY, metric_type VARCHAR(50) NOT NULL, metric_value NUMERIC NOT NULL, metadata JSONB DEFAULT '{}', timestamp TIMESTAMP DEFAULT NOW() ); `; await executeSqlWithFallback(sql, context); } async function enableRealtimeForTable(input: RealtimeManagementInput, context: ToolContext): Promise<any> { const publicationName = `supabase_realtime_${input.schemaName}_${input.tableName}`; // Create publication for the table const createPublicationSql = ` CREATE PUBLICATION ${publicationName} FOR TABLE ${input.schemaName}.${input.tableName} `; try { await executeSqlWithFallback(createPublicationSql, context); } catch (error: any) { if (!error.message.includes('already exists')) { throw error; } } // Add table to realtime publication (Supabase specific) const addToRealtimeSql = ` ALTER PUBLICATION supabase_realtime ADD TABLE ${input.schemaName}.${input.tableName} `; try { await executeSqlWithFallback(addToRealtimeSql, context); } catch (error: any) { // Publication might not exist or table already added context.log(`Note: ${error.message}`, 'warn'); } // Register in our config table const registerSql = ` INSERT INTO realtime_config ( table_name, schema_name, publication_name, event_types, filters, rate_limiting, security_rules ) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (schema_name, table_name) DO UPDATE SET enabled = true, event_types = EXCLUDED.event_types, updated_at = NOW() RETURNING id `; const result = await executeSqlWithFallback(registerSql, context, [ input.tableName, input.schemaName, publicationName, input.eventTypes, JSON.stringify(input.filters || []), JSON.stringify(input.rateLimiting || {}), JSON.stringify(input.securityRules || []) ]); return { success: true, table: `${input.schemaName}.${input.tableName}`, publication_name: publicationName, event_types: input.eventTypes, config_id: result.data[0].id, client_usage: { javascript: ` const channel = supabase .channel('${input.tableName}_changes') .on('postgres_changes', { event: '*', schema: '${input.schemaName}', table: '${input.tableName}' }, (payload) => console.log(payload) ) .subscribe()`, filters_example: input.filters ? ` // With filters: .on('postgres_changes', { event: 'INSERT', schema: '${input.schemaName}', table: '${input.tableName}', filter: '${input.filters[0]?.column}=${input.filters[0]?.operator}.${input.filters[0]?.value}' })` : null } }; } async function listRealtimePublications(input: RealtimeManagementInput, context: ToolContext): Promise<any> { const publicationsSql = ` SELECT p.pubname as publication_name, p.puballtables as all_tables, p.pubinsert as publishes_insert, p.pubupdate as publishes_update, p.pubdelete as publishes_delete, p.pubtruncate as publishes_truncate, array_agg( CASE WHEN pt.schemaname IS NOT NULL THEN pt.schemaname || '.' || pt.tablename ELSE NULL END ) FILTER (WHERE pt.schemaname IS NOT NULL) as tables FROM pg_publication p LEFT JOIN pg_publication_tables pt ON p.pubname = pt.pubname WHERE p.pubname LIKE 'supabase_realtime%' OR p.pubname = 'supabase_realtime' GROUP BY p.pubname, p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate ORDER BY p.pubname `; const publications = await executeSqlWithFallback(publicationsSql, context); // Get our managed realtime config const configSql = ` SELECT rc.*, CASE WHEN rc.enabled THEN 'active' ELSE 'disabled' END as status FROM realtime_config rc ORDER BY rc.schema_name, rc.table_name `; const config = await executeSqlWithFallback(configSql, context); const summary = { total_publications: publications.data.length, total_tables: config.data.length, active_tables: config.data.filter((c: any) => c.enabled).length, inactive_tables: config.data.filter((c: any) => !c.enabled).length }; return { publications: publications.data, managed_tables: config.data, summary, recommendations: generateRealtimeRecommendations(config.data) }; } async function getRealtimeAnalytics(input: RealtimeManagementInput, context: ToolContext): Promise<any> { // Simulate realtime analytics (in production, this would query actual metrics) const analyticsSql = ` SELECT rc.table_name, rc.schema_name, rc.enabled, rc.event_types, COALESCE(rm.connection_count, 0) as active_connections, COALESCE(rm.message_count, 0) as total_messages, COALESCE(rm.avg_latency_ms, 0) as avg_latency_ms FROM realtime_config rc LEFT JOIN ( SELECT 'connections' as metric_type, metadata->>'table' as table_name, SUM(metric_value) as connection_count, 0 as message_count, 0 as avg_latency_ms FROM realtime_metrics WHERE metric_type = 'active_connections' AND timestamp > NOW() - INTERVAL '1 hour' GROUP BY metadata->>'table' UNION ALL SELECT 'messages' as metric_type, metadata->>'table' as table_name, 0 as connection_count, SUM(metric_value) as message_count, 0 as avg_latency_ms FROM realtime_metrics WHERE metric_type = 'message_count' AND timestamp > NOW() - INTERVAL '24 hours' GROUP BY metadata->>'table' UNION ALL SELECT 'latency' as metric_type, metadata->>'table' as table_name, 0 as connection_count, 0 as message_count, AVG(metric_value) as avg_latency_ms FROM realtime_metrics WHERE metric_type = 'message_latency' AND timestamp > NOW() - INTERVAL '1 hour' GROUP BY metadata->>'table' ) rm ON rc.table_name = rm.table_name ORDER BY rc.schema_name, rc.table_name `; const analytics = await executeSqlWithFallback(analyticsSql, context); // Generate sample metrics for demonstration const sampleMetrics = analytics.data.map((table: any) => ({ ...table, active_connections: Math.floor(Math.random() * 100), total_messages: Math.floor(Math.random() * 10000), avg_latency_ms: Math.floor(Math.random() * 100) + 10, events_per_minute: Math.floor(Math.random() * 500), error_rate: Math.random() * 5 })); const globalMetrics = { total_active_connections: sampleMetrics.reduce((sum, t) => sum + t.active_connections, 0), total_messages_24h: sampleMetrics.reduce((sum, t) => sum + t.total_messages, 0), average_latency: sampleMetrics.length > 0 ? Math.round(sampleMetrics.reduce((sum, t) => sum + t.avg_latency_ms, 0) / sampleMetrics.length) : 0, overall_error_rate: sampleMetrics.length > 0 ? Math.round((sampleMetrics.reduce((sum, t) => sum + t.error_rate, 0) / sampleMetrics.length) * 100) / 100 : 0 }; return { table_analytics: sampleMetrics, global_metrics: globalMetrics, performance_insights: { high_traffic_tables: sampleMetrics.filter(t => t.active_connections > 50), slow_tables: sampleMetrics.filter(t => t.avg_latency_ms > 100), error_prone_tables: sampleMetrics.filter(t => t.error_rate > 2) }, recommendations: generatePerformanceRecommendations(sampleMetrics) }; } async function auditRealtimeSecurity(input: RealtimeManagementInput, context: ToolContext): Promise<any> { const securityChecksSql = ` WITH rls_check AS ( SELECT rc.table_name, rc.schema_name, pt.rowsecurity as rls_enabled, COUNT(pp.policyname) as policy_count FROM realtime_config rc LEFT JOIN pg_tables pt ON rc.table_name = pt.tablename AND rc.schema_name = pt.schemaname LEFT JOIN pg_policies pp ON rc.table_name = pp.tablename AND rc.schema_name = pp.schemaname WHERE rc.enabled = true GROUP BY rc.table_name, rc.schema_name, pt.rowsecurity ), publication_check AS ( SELECT pt.schemaname, pt.tablename, p.pubname, p.pubinsert, p.pubupdate, p.pubdelete FROM pg_publication_tables pt JOIN pg_publication p ON pt.pubname = p.pubname WHERE p.pubname LIKE '%realtime%' ) SELECT rlsc.*, pc.pubname as publication_name, pc.pubinsert, pc.pubupdate, pc.pubdelete, CASE WHEN rlsc.rls_enabled AND rlsc.policy_count > 0 THEN 'secure' WHEN rlsc.rls_enabled AND rlsc.policy_count = 0 THEN 'rls_no_policies' WHEN NOT rlsc.rls_enabled THEN 'no_rls' ELSE 'unknown' END as security_status FROM rls_check rlsc LEFT JOIN publication_check pc ON rlsc.table_name = pc.tablename AND rlsc.schema_name = pc.schemaname ORDER BY CASE security_status WHEN 'no_rls' THEN 1 WHEN 'rls_no_policies' THEN 2 WHEN 'secure' THEN 3 ELSE 4 END, rlsc.schema_name, rlsc.table_name `; const securityAudit = await executeSqlWithFallback(securityChecksSql, context); const securityIssues = []; const recommendations = []; for (const table of securityAudit.data) { if (table.security_status === 'no_rls') { securityIssues.push({ severity: 'high', table: `${table.schema_name}.${table.table_name}`, issue: 'RLS not enabled', description: 'Table is published to realtime without Row Level Security' }); } if (table.security_status === 'rls_no_policies') { securityIssues.push({ severity: 'critical', table: `${table.schema_name}.${table.table_name}`, issue: 'RLS enabled but no policies', description: 'RLS is enabled but no policies exist (blocks all access)' }); } } // Generate recommendations based on findings if (securityIssues.length > 0) { recommendations.push('Enable RLS on all realtime-enabled tables'); recommendations.push('Create appropriate RLS policies for realtime access'); recommendations.push('Regular security audits of realtime publications'); } recommendations.push('Use authentication-based RLS policies for sensitive data'); recommendations.push('Monitor realtime connections for unusual patterns'); recommendations.push('Implement rate limiting for realtime channels'); const securityScore = Math.max(0, 100 - (securityIssues.length * 15)); return { security_audit: securityAudit.data, security_issues: securityIssues, security_score: securityScore, recommendations: recommendations, compliance_status: { rls_coverage: securityAudit.data.filter((t: any) => t.security_status === 'secure').length, total_tables: securityAudit.data.length, coverage_percentage: securityAudit.data.length > 0 ? Math.round((securityAudit.data.filter((t: any) => t.security_status === 'secure').length / securityAudit.data.length) * 100) : 100 } }; } function generateRealtimeRecommendations(configData: any[]): string[] { const recommendations = []; const disabledTables = configData.filter(c => !c.enabled); if (disabledTables.length > 0) { recommendations.push(`${disabledTables.length} tables have realtime disabled`); } const noFilters = configData.filter(c => c.filters === '[]'); if (noFilters.length > 0) { recommendations.push(`Consider adding filters to ${noFilters.length} tables to reduce bandwidth`); } const allEvents = configData.filter(c => c.event_types.includes('*')); if (allEvents.length > 0) { recommendations.push(`${allEvents.length} tables broadcast all events - consider being more specific`); } recommendations.push('Monitor realtime performance and connection counts'); recommendations.push('Implement proper authentication for realtime channels'); return recommendations; } function generatePerformanceRecommendations(metricsData: any[]): string[] { const recommendations = []; const highTraffic = metricsData.filter(t => t.active_connections > 50); if (highTraffic.length > 0) { recommendations.push(`${highTraffic.length} tables have high connection counts - monitor for performance impact`); } const slowTables = metricsData.filter(t => t.avg_latency_ms > 100); if (slowTables.length > 0) { recommendations.push(`${slowTables.length} tables have high latency - investigate query performance`); } const errorProne = metricsData.filter(t => t.error_rate > 2); if (errorProne.length > 0) { recommendations.push(`${errorProne.length} tables have elevated error rates - check for issues`); } recommendations.push('Implement connection pooling for high-traffic scenarios'); recommendations.push('Use filters to reduce unnecessary message broadcasts'); recommendations.push('Consider batch updates for high-frequency changes'); return recommendations; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MisterSandFR/Supabase-MCP-SelfHosted'

If you have feedback or need assistance with the MCP directory API, please join our Discord server