Skip to main content
Glama

Postgres MCP Server

index.ts56.8 kB
#!/usr/bin/env node // Set MCP server mode FIRST, before any imports that use logger process.env.MCP_SERVER = 'true'; import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { CallToolRequestSchema, ErrorCode, ListResourcesRequestSchema, ListToolsRequestSchema, ReadResourceRequestSchema, McpError, } from '@modelcontextprotocol/sdk/types.js'; import dotenv from 'dotenv'; import { ConfigManager } from './config.js'; import { DatabaseConnectionManager } from './database/connection-manager.js'; import { QueryAPIClient } from './api/domains/query-api.js'; import { TablesAPIClient } from './api/domains/tables-api.js'; import { SchemaAPIClient } from './api/domains/schema-api.js'; import { logger } from './logger.js'; import { ParameterValidator, ValidationError, TOOL_NAME_MAPPINGS, suggestToolName } from './validation.js'; import { QueryResultCache, PerformanceMonitor } from './common/cache.js'; import { DatabaseError, ErrorHandler, withRetry } from './common/errors.js'; import { RateLimiter, SecurityValidator } from './common/security.js'; // Load environment variables dotenv.config(); // Tool definitions with comprehensive database management capabilities const toolDefinitions = [ // QUERY EXECUTION TOOL { name: 'query', description: 'Execute SQL queries with transaction support, query analysis, and performance monitoring', inputSchema: { type: 'object', properties: { action: { type: 'string', enum: ['execute', 'transaction', 'explain', 'analyze', 'validate', 'cancel', 'active'], description: 'Action: execute (single query), transaction (multiple queries), explain (execution plan), analyze (performance), validate (syntax), cancel (query by PID), active (list active queries)' }, sql: { type: 'string', description: 'SQL query to execute (required for execute, explain, analyze, validate actions)' }, parameters: { type: 'array', items: { type: 'string' }, description: 'Query parameters for parameterized queries' }, queries: { type: 'array', items: { type: 'object', properties: { sql: { type: 'string' }, parameters: { type: 'array', items: { type: 'string' } } }, required: ['sql'] }, description: 'Array of queries for transaction action' }, options: { type: 'object', properties: { timeout: { type: 'integer', description: 'Query timeout in milliseconds' }, limit: { type: 'integer', description: 'Maximum number of rows to return' }, offset: { type: 'integer', description: 'Number of rows to skip' }, readOnly: { type: 'boolean', description: 'Execute as read-only transaction' } }, description: 'Query execution options' }, pid: { type: 'integer', description: 'Process ID of query to cancel (required for cancel action)' } }, required: ['action'] } }, // TABLE MANAGEMENT TOOL { name: 'tables', description: 'Table management: list, create, alter, drop tables and get detailed table information', inputSchema: { type: 'object', properties: { action: { type: 'string', enum: ['list', 'info', 'create', 'drop', 'add_column', 'drop_column', 'rename'], description: 'Action: list (all tables), info (table details), create (new table), drop (remove table), add_column (add column), drop_column (remove column), rename (rename table)' }, schemaName: { type: 'string', description: 'Schema name (default: public)', default: 'public' }, tableName: { type: 'string', description: 'Table name (required for info, create, drop, add_column, drop_column, rename)' }, columns: { type: 'array', items: { type: 'object', properties: { name: { type: 'string' }, type: { type: 'string' }, nullable: { type: 'boolean', default: true }, defaultValue: { type: 'string' }, primaryKey: { type: 'boolean', default: false } }, required: ['name', 'type'] }, description: 'Column definitions for create action' }, columnName: { type: 'string', description: 'Column name (required for add_column, drop_column)' }, dataType: { type: 'string', description: 'Data type (required for add_column)' }, newName: { type: 'string', description: 'New name (required for rename action)' }, options: { type: 'object', properties: { includeViews: { type: 'boolean', default: false }, includeSystemTables: { type: 'boolean', default: false }, ifNotExists: { type: 'boolean', default: false }, ifExists: { type: 'boolean', default: true }, cascade: { type: 'boolean', default: false }, temporary: { type: 'boolean', default: false } }, description: 'Action-specific options' } }, required: ['action'] } }, // SCHEMA MANAGEMENT TOOL { name: 'schemas', description: 'Schema management: list, create, drop schemas and manage schema permissions', inputSchema: { type: 'object', properties: { action: { type: 'string', enum: ['list', 'create', 'drop', 'permissions'], description: 'Action: list (all schemas), create (new schema), drop (remove schema), permissions (schema permissions)' }, schemaName: { type: 'string', description: 'Schema name (required for create, drop, permissions)' }, owner: { type: 'string', description: 'Schema owner (for create action)' }, options: { type: 'object', properties: { ifNotExists: { type: 'boolean', default: false }, ifExists: { type: 'boolean', default: true }, cascade: { type: 'boolean', default: false } }, description: 'Action-specific options' } }, required: ['action'] } }, // INDEX MANAGEMENT TOOL { name: 'indexes', description: 'Index management: list, create, drop indexes and analyze index usage', inputSchema: { type: 'object', properties: { action: { type: 'string', enum: ['list', 'create', 'drop', 'analyze', 'reindex', 'unused'], description: 'Action: list (all indexes), create (new index), drop (remove index), analyze (index statistics), reindex (rebuild index), unused (find unused indexes)' }, schemaName: { type: 'string', description: 'Schema name (default: public)', default: 'public' }, tableName: { type: 'string', description: 'Table name (required for create, list by table)' }, indexName: { type: 'string', description: 'Index name (required for drop, reindex)' }, columns: { type: 'array', items: { type: 'string' }, description: 'Column names for index (required for create)' }, options: { type: 'object', properties: { unique: { type: 'boolean', default: false }, concurrent: { type: 'boolean', default: false }, ifNotExists: { type: 'boolean', default: false }, ifExists: { type: 'boolean', default: true }, method: { type: 'string', enum: ['btree', 'hash', 'gist', 'spgist', 'gin', 'brin'] } }, description: 'Index creation options' } }, required: ['action'] } }, // DATA MANAGEMENT TOOL { name: 'data', description: 'Data operations: insert, update, delete, bulk operations with validation', inputSchema: { type: 'object', properties: { action: { type: 'string', enum: ['insert', 'update', 'delete', 'bulk_insert', 'bulk_update', 'truncate'], description: 'Action: insert (single row), update (modify rows), delete (remove rows), bulk_insert (multiple rows), bulk_update (batch update), truncate (empty table)' }, tableName: { type: 'string', description: 'Table name (required for all actions)' }, schemaName: { type: 'string', description: 'Schema name (default: public)', default: 'public' }, data: { type: 'object', description: 'Data object for insert/update (key-value pairs)' }, rows: { type: 'array', items: { type: 'object' }, description: 'Array of data objects for bulk operations' }, where: { type: 'object', description: 'WHERE conditions for update/delete operations' }, options: { type: 'object', properties: { onConflict: { type: 'string', description: 'ON CONFLICT action (DO NOTHING, DO UPDATE)' }, returning: { type: 'array', items: { type: 'string' }, description: 'Columns to return' }, validate: { type: 'boolean', default: true, description: 'Validate data before operation' } }, description: 'Operation options' } }, required: ['action', 'tableName'] } }, // TRANSACTION MANAGEMENT TOOL { name: 'transactions', description: 'Transaction management: begin, commit, rollback, savepoints', inputSchema: { type: 'object', properties: { action: { type: 'string', enum: ['begin', 'commit', 'rollback', 'savepoint', 'rollback_to', 'release', 'status'], description: 'Action: begin (start transaction), commit (commit transaction), rollback (rollback transaction), savepoint (create savepoint), rollback_to (rollback to savepoint), release (release savepoint), status (transaction status)' }, transactionId: { type: 'string', description: 'Transaction ID (required for commit, rollback, and operations within transaction)' }, savepointName: { type: 'string', description: 'Savepoint name (required for savepoint, rollback_to, release)' }, readOnly: { type: 'boolean', description: 'Start read-only transaction (for begin action)', default: false }, isolationLevel: { type: 'string', enum: ['READ UNCOMMITTED', 'READ COMMITTED', 'REPEATABLE READ', 'SERIALIZABLE'], description: 'Transaction isolation level (for begin action)' } }, required: ['action'] } }, // DATABASE ADMINISTRATION TOOL { name: 'admin', description: 'Database administration: users, permissions, database info, maintenance operations', inputSchema: { type: 'object', properties: { operation: { type: 'string', enum: ['database_info', 'list_users', 'create_user', 'drop_user', 'grant_permissions', 'revoke_permissions', 'vacuum', 'analyze', 'reindex_database'], description: 'Admin operation to perform' }, username: { type: 'string', description: 'Username (required for user operations)' }, password: { type: 'string', description: 'Password (required for create_user)' }, permissions: { type: 'array', items: { type: 'string' }, description: 'Permissions to grant/revoke' }, tableName: { type: 'string', description: 'Table name (for permission operations)' }, options: { type: 'object', properties: { full: { type: 'boolean', default: false }, verbose: { type: 'boolean', default: false }, analyze: { type: 'boolean', default: false } }, description: 'Operation options' } }, required: ['operation'] } }, // MONITORING TOOL { name: 'monitoring', description: 'Database monitoring: performance metrics, statistics, health checks', inputSchema: { type: 'object', properties: { metric: { type: 'string', enum: ['connections', 'performance', 'locks', 'replication', 'disk_usage', 'query_stats', 'index_usage'], description: 'Metric type to retrieve' }, timeRange: { type: 'string', enum: ['1h', '24h', '7d', '30d'], description: 'Time range for metrics', default: '1h' }, limit: { type: 'integer', description: 'Maximum number of results', default: 50 } }, required: ['metric'] } }, // CONNECTION MANAGEMENT TOOL { name: 'connections', description: 'Connection pool management: status, statistics, configuration', inputSchema: { type: 'object', properties: { action: { type: 'string', enum: ['status', 'stats', 'test', 'reset'], description: 'Action: status (pool status), stats (detailed statistics), test (test connection), reset (reset pool)' } }, required: ['action'] } }, // PERMISSIONS TOOL { name: 'permissions', description: 'Database permissions management: users, roles, grants, privileges', inputSchema: { type: 'object', properties: { operation: { type: 'string', enum: [ 'list_users', 'list_roles', 'list_grants', 'list_privileges', 'create_user', 'create_role', 'drop_user', 'drop_role', 'grant_role', 'revoke_role', 'grant_privilege', 'revoke_privilege', 'alter_user', 'alter_role', 'check_permissions', 'grant_all_privileges' ], description: 'Permission operation to perform' }, username: { type: 'string', description: 'Username for user operations' }, rolename: { type: 'string', description: 'Role name for role operations' }, password: { type: 'string', description: 'Password for user creation/modification' }, database: { type: 'string', description: 'Database name for grants' }, schema: { type: 'string', description: 'Schema name for grants' }, table: { type: 'string', description: 'Table name for grants' }, privileges: { type: 'array', items: { type: 'string', enum: ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'REFERENCES', 'TRIGGER', 'CREATE', 'CONNECT', 'TEMPORARY', 'EXECUTE', 'USAGE', 'ALL'] }, description: 'Privileges to grant/revoke' }, attributes: { type: 'object', properties: { superuser: { type: 'boolean', description: 'Superuser privilege' }, createdb: { type: 'boolean', description: 'Create database privilege' }, createrole: { type: 'boolean', description: 'Create role privilege' }, replication: { type: 'boolean', description: 'Replication privilege' }, login: { type: 'boolean', description: 'Login privilege' }, inherit: { type: 'boolean', description: 'Inherit privileges' }, bypassrls: { type: 'boolean', description: 'Bypass row level security' } }, description: 'User/role attributes' }, grantOption: { type: 'boolean', description: 'Grant with GRANT OPTION', default: false } }, required: ['operation'] } }, // SECURITY TOOL { name: 'security', description: 'Database security management: SSL, authentication, encryption, auditing', inputSchema: { type: 'object', properties: { operation: { type: 'string', enum: [ 'check_ssl', 'list_auth_methods', 'check_encryption', 'audit_log', 'password_policy', 'connection_limits', 'session_security', 'row_level_security', 'column_encryption', 'security_labels' ], description: 'Security operation to perform' }, table: { type: 'string', description: 'Table name for RLS operations' }, policy_name: { type: 'string', description: 'RLS policy name' }, policy_expression: { type: 'string', description: 'RLS policy expression' }, audit_type: { type: 'string', enum: ['connections', 'queries', 'ddl', 'dml', 'errors'], description: 'Type of audit information' } }, required: ['operation'] } } ]; class PostgresMCPServer { private server: Server; private config: ConfigManager; private dbManager: DatabaseConnectionManager; private queryClient: QueryAPIClient; private tablesClient: TablesAPIClient; private schemaClient: SchemaAPIClient; private cache: QueryResultCache; private performanceMonitor: PerformanceMonitor; private rateLimiter: RateLimiter; private securityValidator: SecurityValidator; constructor() { this.config = new ConfigManager(); this.config.validate(); logger.info('Initializing PostgreSQL MCP Server', { host: this.config.getDatabaseConfig().host, database: this.config.getDatabaseConfig().database, toolCount: toolDefinitions.length }); // Initialize database connection manager this.dbManager = new DatabaseConnectionManager(this.config); // Initialize common services this.cache = new QueryResultCache(this.config); this.performanceMonitor = new PerformanceMonitor(); this.rateLimiter = new RateLimiter(this.config); this.securityValidator = new SecurityValidator(this.config); // Initialize API clients this.queryClient = new QueryAPIClient(this.dbManager); this.tablesClient = new TablesAPIClient(this.dbManager); this.schemaClient = new SchemaAPIClient(this.dbManager, this.cache); // Add performance monitoring to API clients (this.queryClient as any).performanceMonitor = this.performanceMonitor; (this.tablesClient as any).performanceMonitor = this.performanceMonitor; (this.schemaClient as any).performanceMonitor = this.performanceMonitor; // Add security validation to API clients (this.queryClient as any).securityValidator = this.securityValidator; (this.tablesClient as any).securityValidator = this.securityValidator; (this.schemaClient as any).securityValidator = this.securityValidator; this.server = new Server( { name: 'postgres-mcp', version: '1.0.0', }, { capabilities: { resources: {}, tools: {}, }, } ); this.setupToolHandlers(); this.setupResourceHandlers(); } private setupResourceHandlers(): void { // List database resources (schemas, tables) this.server.setRequestHandler(ListResourcesRequestSchema, async () => { try { const tables = await this.tablesClient.listTables(); const resources = tables.map(table => ({ uri: `postgres://${table.schemaName}/${table.tableName}/schema`, mimeType: 'application/json', name: `${table.schemaName}.${table.tableName} schema`, description: `Schema information for table ${table.schemaName}.${table.tableName}` })); return { resources }; } catch (error) { logger.error('Failed to list resources', { error: error instanceof Error ? error.message : error }); return { resources: [] }; } }); // Read specific resource (table schema) this.server.setRequestHandler(ReadResourceRequestSchema, async (request) => { try { const uri = new URL(request.params.uri); const pathParts = uri.pathname.split('/'); const schemaName = pathParts[1]; const tableName = pathParts[2]; if (!schemaName || !tableName) { throw new Error('Invalid resource URI format'); } const tableInfo = await this.tablesClient.getTableInfo(tableName, schemaName); return { contents: [{ uri: request.params.uri, mimeType: 'application/json', text: JSON.stringify(tableInfo, null, 2) }] }; } catch (error) { logger.error('Failed to read resource', { uri: request.params.uri, error: error instanceof Error ? error.message : error }); throw new McpError(ErrorCode.InternalError, `Failed to read resource: ${error instanceof Error ? error.message : error}`); } }); } private setupToolHandlers(): void { // Register tool definitions this.server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: toolDefinitions, })); this.server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args = {} } = request.params; try { switch (name) { case 'query': return await this.handleQuery(args); case 'tables': return await this.handleTables(args); case 'schemas': return await this.handleSchemas(args); case 'indexes': return await this.handleIndexes(args); case 'data': return await this.handleData(args); case 'transactions': return await this.handleTransactions(args); case 'admin': return await this.handleAdmin(args); case 'monitoring': return await this.handleMonitoring(args); case 'connections': return await this.handleConnections(args); case 'permissions': return await this.handlePermissions(args); case 'security': return await this.handleSecurity(args); default: { const suggestion = suggestToolName(name); logger.warn('Unknown tool requested', { tool: name, suggestion: TOOL_NAME_MAPPINGS[name] || 'none', availableTools: toolDefinitions.map(t => t.name) }); throw new McpError( ErrorCode.MethodNotFound, `Unknown tool: ${name}. ${suggestion}` ); } } } catch (error) { logger.error('Tool execution error', { tool: name, error: error instanceof Error ? error.message : error }); if (error instanceof McpError) { throw error; } if (error instanceof ValidationError) { throw ParameterValidator.toMcpError(error); } // Handle our custom database errors if (error instanceof DatabaseError) { throw new McpError( ErrorCode.InternalError, error.message ); } // Handle generic errors with better context const sanitizedError = ErrorHandler.sanitizeError(error); throw new McpError( ErrorCode.InternalError, `Tool execution failed: ${sanitizedError.message}` ); } }); } private async handleQuery(args: any) { const { action, sql, parameters, queries, options, pid } = args; switch (action) { case 'execute': ParameterValidator.validateRequired(sql, 'sql'); return { content: [{ type: 'text', text: JSON.stringify(await this.queryClient.executeQuery(sql, parameters, options), null, 2) }] }; case 'transaction': ParameterValidator.validateRequired(queries, 'queries'); return { content: [{ type: 'text', text: JSON.stringify(await this.queryClient.executeTransaction(queries, options?.readOnly), null, 2) }] }; case 'explain': ParameterValidator.validateRequired(sql, 'sql'); return { content: [{ type: 'text', text: JSON.stringify(await this.queryClient.getExecutionPlan(sql, parameters), null, 2) }] }; case 'analyze': ParameterValidator.validateRequired(sql, 'sql'); return { content: [{ type: 'text', text: JSON.stringify(await this.queryClient.analyzeQuery(sql, parameters), null, 2) }] }; case 'validate': ParameterValidator.validateRequired(sql, 'sql'); return { content: [{ type: 'text', text: JSON.stringify(await this.queryClient.validateSyntax(sql), null, 2) }] }; case 'active': return { content: [{ type: 'text', text: JSON.stringify(await this.queryClient.getActiveQueries(), null, 2) }] }; case 'cancel': ParameterValidator.validateRequired(pid, 'pid'); return { content: [{ type: 'text', text: JSON.stringify({ cancelled: await this.queryClient.cancelQuery(pid) }, null, 2) }] }; default: throw new Error(`Unknown query action: ${action}`); } } private async handleTables(args: any) { const { action, tableName, schemaName = 'public', columns, columnName, dataType, newName, options = {} } = args; switch (action) { case 'list': return { content: [{ type: 'text', text: JSON.stringify(await this.tablesClient.listTables(schemaName, options.includeViews, options.includeSystemTables), null, 2) }] }; case 'info': ParameterValidator.validateRequired(tableName, 'tableName'); return { content: [{ type: 'text', text: JSON.stringify(await this.tablesClient.getTableInfo(tableName, schemaName), null, 2) }] }; case 'create': ParameterValidator.validateRequired(tableName, 'tableName'); ParameterValidator.validateRequired(columns, 'columns'); return { content: [{ type: 'text', text: JSON.stringify(await this.tablesClient.createTable(tableName, columns, { schema: schemaName, ...options }), null, 2) }] }; case 'drop': ParameterValidator.validateRequired(tableName, 'tableName'); return { content: [{ type: 'text', text: JSON.stringify(await this.tablesClient.dropTable(tableName, schemaName, options.cascade, options.ifExists), null, 2) }] }; case 'add_column': ParameterValidator.validateRequired(tableName, 'tableName'); ParameterValidator.validateRequired(columnName, 'columnName'); ParameterValidator.validateRequired(dataType, 'dataType'); return { content: [{ type: 'text', text: JSON.stringify(await this.tablesClient.addColumn(tableName, columnName, dataType, schemaName, options), null, 2) }] }; default: throw new Error(`Unknown tables action: ${action}`); } } private async handleSchemas(args: any) { const { action, schemaName, owner, options = {} } = args; switch (action) { case 'list': const schemas = await this.schemaClient.listSchemas(options.includeSystem); return { content: [{ type: 'text', text: JSON.stringify(schemas, null, 2) }] }; case 'info': ParameterValidator.validateRequired(schemaName, 'schemaName'); const schemaInfo = await this.schemaClient.getSchemaInfo(schemaName); return { content: [{ type: 'text', text: JSON.stringify(schemaInfo, null, 2) }] }; case 'create': ParameterValidator.validateRequired(schemaName, 'schemaName'); const createResult = await this.schemaClient.createSchema(schemaName, { ifNotExists: options.ifNotExists, owner, authorization: options.authorization }); return { content: [{ type: 'text', text: JSON.stringify(createResult, null, 2) }] }; case 'drop': ParameterValidator.validateRequired(schemaName, 'schemaName'); const dropResult = await this.schemaClient.dropSchema( schemaName, options.cascade, options.ifExists ); return { content: [{ type: 'text', text: JSON.stringify(dropResult, null, 2) }] }; case 'rename': ParameterValidator.validateRequired(schemaName, 'schemaName'); ParameterValidator.validateRequired(options.newName, 'newName'); const renameResult = await this.schemaClient.renameSchema(schemaName, options.newName); return { content: [{ type: 'text', text: JSON.stringify(renameResult, null, 2) }] }; default: throw new Error(`Unknown schema action: ${action}`); } } private async handleIndexes(args: any) { const { action, schemaName = 'public', tableName, indexName, columns, options = {} } = args; switch (action) { case 'list': let listQuery; let params: any[] = []; if (tableName) { listQuery = ` SELECT i.indexname as index_name, i.tablename as table_name, i.schemaname as schema_name, pg_get_indexdef(pgc.oid) as definition, CASE WHEN i.indexname ~ '^.*_pkey$' THEN 'PRIMARY KEY' WHEN idx.indisunique THEN 'UNIQUE' ELSE 'INDEX' END as index_type, pg_size_pretty(pg_relation_size(pgc.oid)) as size, idx.indisvalid as is_valid FROM pg_indexes i JOIN pg_class pgc ON pgc.relname = i.indexname JOIN pg_index idx ON idx.indexrelid = pgc.oid WHERE i.tablename = $1 AND i.schemaname = $2 ORDER BY i.indexname `; params = [tableName, schemaName]; } else { listQuery = ` SELECT i.indexname as index_name, i.tablename as table_name, i.schemaname as schema_name, pg_get_indexdef(pgc.oid) as definition, CASE WHEN i.indexname ~ '^.*_pkey$' THEN 'PRIMARY KEY' WHEN idx.indisunique THEN 'UNIQUE' ELSE 'INDEX' END as index_type, pg_size_pretty(pg_relation_size(pgc.oid)) as size, idx.indisvalid as is_valid FROM pg_indexes i JOIN pg_class pgc ON pgc.relname = i.indexname JOIN pg_index idx ON idx.indexrelid = pgc.oid WHERE i.schemaname NOT IN ('information_schema', 'pg_catalog', 'pg_toast') ORDER BY i.schemaname, i.tablename, i.indexname `; } const indexes = await this.queryClient.executeQuery(listQuery, params); return { content: [{ type: 'text', text: JSON.stringify(indexes.rows, null, 2) }] }; case 'create': if (!tableName || !columns || columns.length === 0) { throw new Error('Table name and columns are required for index creation'); } const indexNameToUse = indexName || `idx_${tableName}_${columns.join('_')}`; let createIndexSQL = `CREATE${options.unique ? ' UNIQUE' : ''} INDEX${options.concurrent ? ' CONCURRENTLY' : ''}${options.ifNotExists ? ' IF NOT EXISTS' : ''} ${indexNameToUse}`; createIndexSQL += ` ON ${schemaName}.${tableName}`; if (options.method) { createIndexSQL += ` USING ${options.method}`; } createIndexSQL += ` (${columns.join(', ')})`; await this.queryClient.executeQuery(createIndexSQL); return { content: [{ type: 'text', text: `Index '${indexNameToUse}' created successfully on ${schemaName}.${tableName}` }] }; case 'drop': if (!indexName) { throw new Error('Index name is required for drop action'); } const dropSQL = `DROP INDEX${options.concurrent ? ' CONCURRENTLY' : ''}${options.ifExists ? ' IF EXISTS' : ''} ${schemaName}.${indexName}`; await this.queryClient.executeQuery(dropSQL); return { content: [{ type: 'text', text: `Index '${indexName}' dropped successfully` }] }; case 'analyze': const analyzeQuery = ` SELECT schemaname, tablename, indexname, idx_tup_read, idx_tup_fetch, idx_scan, CASE WHEN idx_scan = 0 THEN 'UNUSED' WHEN idx_scan < 10 THEN 'LOW_USAGE' ELSE 'ACTIVE' END as usage_status FROM pg_stat_user_indexes WHERE schemaname = $1 ORDER BY idx_scan DESC `; const stats = await this.queryClient.executeQuery(analyzeQuery, [schemaName]); return { content: [{ type: 'text', text: JSON.stringify(stats.rows, null, 2) }] }; case 'reindex': if (!indexName && !tableName) { throw new Error('Either index name or table name is required for reindex'); } let reindexSQL; if (indexName) { reindexSQL = `REINDEX INDEX${options.concurrent ? ' CONCURRENTLY' : ''} ${schemaName}.${indexName}`; } else { reindexSQL = `REINDEX TABLE${options.concurrent ? ' CONCURRENTLY' : ''} ${schemaName}.${tableName}`; } await this.queryClient.executeQuery(reindexSQL); return { content: [{ type: 'text', text: `Reindex completed for ${indexName || tableName}` }] }; case 'unused': const unusedQuery = ` SELECT schemaname, tablename, indexname, pg_size_pretty(pg_relation_size(indexrelid)) as size, idx_scan as scans FROM pg_stat_user_indexes WHERE idx_scan = 0 AND schemaname = $1 AND indexname NOT LIKE '%_pkey' ORDER BY pg_relation_size(indexrelid) DESC `; const unused = await this.queryClient.executeQuery(unusedQuery, [schemaName]); return { content: [{ type: 'text', text: JSON.stringify(unused.rows, null, 2) }] }; default: throw new Error(`Unknown index action: ${action}`); } } private async handleData(args: any) { // Placeholder for data operations return { content: [{ type: 'text', text: JSON.stringify({ message: 'Data operations not yet implemented' }, null, 2) }] }; } private async handleTransactions(args: any) { const { action, transactionId, readOnly, isolationLevel } = args; switch (action) { case 'begin': const txId = await this.dbManager.beginTransaction(readOnly); return { content: [{ type: 'text', text: JSON.stringify({ transactionId: txId, status: 'started' }, null, 2) }] }; case 'commit': ParameterValidator.validateRequired(transactionId, 'transactionId'); await this.dbManager.commitTransaction(transactionId); return { content: [{ type: 'text', text: JSON.stringify({ transactionId, status: 'committed' }, null, 2) }] }; case 'rollback': ParameterValidator.validateRequired(transactionId, 'transactionId'); await this.dbManager.rollbackTransaction(transactionId); return { content: [{ type: 'text', text: JSON.stringify({ transactionId, status: 'rolled_back' }, null, 2) }] }; case 'status': return { content: [{ type: 'text', text: JSON.stringify(this.dbManager.getOperationalStats(), null, 2) }] }; default: throw new Error(`Unknown transaction action: ${action}`); } } private async handleAdmin(args: any) { const { operation, username, password, permissions, tableName, options = {} } = args; switch (operation) { case 'database_info': const dbInfo = await this.queryClient.executeQuery(` SELECT current_database() as database_name, current_user as current_user, session_user as session_user, current_setting('server_version') as postgres_version, current_setting('server_encoding') as encoding, current_setting('timezone') as timezone, pg_database_size(current_database()) as database_size_bytes, pg_size_pretty(pg_database_size(current_database())) as database_size, (SELECT count(*) FROM pg_stat_activity WHERE datname = current_database()) as active_connections, current_setting('max_connections') as max_connections, current_setting('shared_buffers') as shared_buffers, current_setting('effective_cache_size') as effective_cache_size `); const tableCount = await this.queryClient.executeQuery(` SELECT count(*) as table_count FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog') `); const result = { ...dbInfo.rows[0], table_count: parseInt(tableCount.rows[0].table_count), uptime: await this.getDatabaseUptime() }; return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }; case 'list_users': const users = await this.queryClient.executeQuery(` SELECT usename as username, usesysid as user_id, usecreatedb as can_create_db, usesuper as is_superuser, userepl as can_replicate, usebypassrls as bypass_rls, valuntil as password_expires, (SELECT string_agg(datname, ', ') FROM pg_database WHERE has_database_privilege(usename, datname, 'CONNECT')) as accessible_databases FROM pg_user ORDER BY usename `); return { content: [{ type: 'text', text: JSON.stringify(users.rows, null, 2) }] }; case 'create_user': if (!username || !password) { throw new Error('Username and password are required for user creation'); } await this.queryClient.executeQuery(`CREATE USER ${username} WITH PASSWORD '${password}'`); return { content: [{ type: 'text', text: `User '${username}' created successfully` }] }; case 'drop_user': if (!username) { throw new Error('Username is required for user deletion'); } await this.queryClient.executeQuery(`DROP USER ${username}`); return { content: [{ type: 'text', text: `User '${username}' dropped successfully` }] }; case 'grant_permissions': if (!username || !permissions || permissions.length === 0) { throw new Error('Username and permissions are required'); } const target = tableName ? `TABLE ${tableName}` : 'ALL TABLES IN SCHEMA public'; const grantSQL = `GRANT ${permissions.join(', ')} ON ${target} TO ${username}`; await this.queryClient.executeQuery(grantSQL); return { content: [{ type: 'text', text: `Permissions ${permissions.join(', ')} granted to '${username}' on ${target}` }] }; case 'revoke_permissions': if (!username || !permissions || permissions.length === 0) { throw new Error('Username and permissions are required'); } const revokeTarget = tableName ? `TABLE ${tableName}` : 'ALL TABLES IN SCHEMA public'; const revokeSQL = `REVOKE ${permissions.join(', ')} ON ${revokeTarget} FROM ${username}`; await this.queryClient.executeQuery(revokeSQL); return { content: [{ type: 'text', text: `Permissions ${permissions.join(', ')} revoked from '${username}' on ${revokeTarget}` }] }; case 'vacuum': if (tableName) { const vacuumSQL = `VACUUM${options.full ? ' FULL' : ''} ${tableName}`; await this.queryClient.executeQuery(vacuumSQL); return { content: [{ type: 'text', text: `Vacuum completed for table '${tableName}'` }] }; } else { await this.queryClient.executeQuery('VACUUM'); return { content: [{ type: 'text', text: 'Database vacuum completed' }] }; } case 'analyze': if (tableName) { await this.queryClient.executeQuery(`ANALYZE ${tableName}`); return { content: [{ type: 'text', text: `Analyze completed for table '${tableName}'` }] }; } else { await this.queryClient.executeQuery('ANALYZE'); return { content: [{ type: 'text', text: 'Database analyze completed' }] }; } case 'reindex_database': await this.queryClient.executeQuery('REINDEX DATABASE CONCURRENTLY'); return { content: [{ type: 'text', text: 'Database reindex completed' }] }; default: throw new Error(`Unknown admin operation: ${operation}`); } } private async getDatabaseUptime(): Promise<string> { try { const uptime = await this.queryClient.executeQuery(` SELECT date_trunc('second', now() - pg_postmaster_start_time()) as uptime `); return uptime.rows[0].uptime; } catch (error) { return 'Unable to determine uptime'; } } private async handleMonitoring(args: any) { const { metric } = args; switch (metric) { case 'connections': return { content: [{ type: 'text', text: JSON.stringify(this.dbManager.getPoolStats(), null, 2) }] }; case 'performance': const operationalStats = this.dbManager.getOperationalStats(); const performanceMetrics = this.performanceMonitor.getMetrics(); const slowOperations = this.performanceMonitor.getSlowOperations(); return { content: [{ type: 'text', text: JSON.stringify({ operational: operationalStats, performance: performanceMetrics, slowOperations, cache: this.cache.getStats() }, null, 2) }] }; case 'cache': return { content: [{ type: 'text', text: JSON.stringify({ stats: this.cache.getStats(), recent: this.cache.getRecentEntries(5), popular: this.cache.getPopularEntries(5) }, null, 2) }] }; case 'security': const rateLimitEntries = Array.from(this.rateLimiter.getAllEntries().entries()).slice(0, 10); return { content: [{ type: 'text', text: JSON.stringify({ rateLimits: rateLimitEntries, securityEvents: 'Security event logging would be implemented here' }, null, 2) }] }; default: return { content: [{ type: 'text', text: JSON.stringify({ message: `Monitoring metric '${metric}' not yet implemented` }, null, 2) }] }; } } private async handleConnections(args: any) { const { action } = args; switch (action) { case 'status': return { content: [{ type: 'text', text: JSON.stringify(this.dbManager.getPoolStats(), null, 2) }] }; case 'stats': return { content: [{ type: 'text', text: JSON.stringify(this.dbManager.getOperationalStats(), null, 2) }] }; case 'test': const isHealthy = await this.dbManager.testConnection(); return { content: [{ type: 'text', text: JSON.stringify({ connected: isHealthy, timestamp: new Date().toISOString() }, null, 2) }] }; default: throw new Error(`Unknown connections action: ${action}`); } } private async handlePermissions(args: any) { const { operation, username, rolename, password, database, schema, table, privileges, attributes, grantOption } = args; switch (operation) { case 'list_users': const users = await this.queryClient.executeQuery(` SELECT r.rolname as username, r.oid as user_id, r.rolcreatedb as can_create_db, r.rolsuper as is_superuser, r.rolreplication as can_replicate, r.rolbypassrls as bypass_rls, r.rolvaliduntil as password_expires, ARRAY( SELECT m.rolname FROM pg_roles m JOIN pg_auth_members am ON m.oid = am.roleid WHERE am.member = r.oid ) as member_of FROM pg_roles r WHERE r.rolcanlogin = true ORDER BY r.rolname `); return { content: [{ type: 'text', text: JSON.stringify(users.rows, null, 2) }] }; case 'list_roles': const roles = await this.queryClient.executeQuery(` SELECT rolname as role_name, rolsuper as is_superuser, rolinherit as inherits, rolcreaterole as can_create_role, rolcreatedb as can_create_db, rolcanlogin as can_login, rolreplication as can_replicate, rolconnlimit as connection_limit, rolvaliduntil as valid_until, rolbypassrls as bypass_rls FROM pg_roles ORDER BY rolname `); return { content: [{ type: 'text', text: JSON.stringify(roles.rows, null, 2) }] }; case 'create_user': if (!username || !password) { throw new Error('Username and password are required for user creation'); } const createUserSQL = `CREATE USER ${username} WITH PASSWORD '${password}'`; if (attributes) { const attrSQL = Object.entries(attributes) .filter(([, value]) => value === true) .map(([key]) => key.toUpperCase()) .join(' '); if (attrSQL) { await this.queryClient.executeQuery(`${createUserSQL} ${attrSQL}`); } else { await this.queryClient.executeQuery(createUserSQL); } } else { await this.queryClient.executeQuery(createUserSQL); } return { content: [{ type: 'text', text: `User '${username}' created successfully` }] }; case 'grant_all_privileges': if (!username || !database) { throw new Error('Username and database are required for granting all privileges'); } const grantAllSQL = [ `GRANT ALL PRIVILEGES ON DATABASE ${database} TO ${username}`, `GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ${username}`, `GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ${username}`, `GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA public TO ${username}` ]; for (const sql of grantAllSQL) { await this.queryClient.executeQuery(sql); } return { content: [{ type: 'text', text: `All privileges granted to '${username}' on database '${database}'` }] }; case 'grant_privilege': if (!username || !privileges || privileges.length === 0) { throw new Error('Username and privileges are required'); } const target = table ? `TABLE ${schema ? schema + '.' : ''}${table}` : schema ? `SCHEMA ${schema}` : database ? `DATABASE ${database}` : 'ALL TABLES IN SCHEMA public'; const grantSQL = `GRANT ${privileges.join(', ')} ON ${target} TO ${username}${grantOption ? ' WITH GRANT OPTION' : ''}`; await this.queryClient.executeQuery(grantSQL); return { content: [{ type: 'text', text: `Privileges ${privileges.join(', ')} granted to '${username}' on ${target}` }] }; case 'check_permissions': if (!username) { throw new Error('Username is required for permission check'); } const permissionsQuery = ` SELECT t.schemaname, t.tablename, p.privilege_type FROM information_schema.table_privileges p JOIN information_schema.tables t ON p.table_name = t.table_name AND p.table_schema = t.table_schema WHERE p.grantee = $1 ORDER BY t.schemaname, t.tablename, p.privilege_type `; const permissions = await this.queryClient.executeQuery(permissionsQuery, [username]); return { content: [{ type: 'text', text: JSON.stringify(permissions.rows, null, 2) }] }; default: throw new Error(`Unknown permissions operation: ${operation}`); } } private async handleSecurity(args: any) { const { operation, table, policy_name, policy_expression, audit_type } = args; switch (operation) { case 'check_ssl': const sslInfo = await this.queryClient.executeQuery(` SELECT name, setting, context, short_desc FROM pg_settings WHERE name LIKE '%ssl%' OR name LIKE '%tls%' ORDER BY name `); return { content: [{ type: 'text', text: JSON.stringify(sslInfo.rows, null, 2) }] }; case 'list_auth_methods': const authMethods = await this.queryClient.executeQuery(` SELECT type, database, user_name, address, netmask, auth_method, options, error FROM pg_hba_file_rules ORDER BY line_number `); return { content: [{ type: 'text', text: JSON.stringify(authMethods.rows, null, 2) }] }; case 'session_security': const sessionInfo = await this.queryClient.executeQuery(` SELECT inet_client_addr() as client_ip, inet_server_addr() as server_ip, current_user, session_user, current_database(), pg_backend_pid() as backend_pid, pg_is_in_recovery() as in_recovery, current_setting('ssl') as ssl_enabled `); return { content: [{ type: 'text', text: JSON.stringify(sessionInfo.rows[0], null, 2) }] }; case 'row_level_security': if (!table) { // List all RLS policies const rlsPolicies = await this.queryClient.executeQuery(` SELECT schemaname, tablename, policyname, permissive, roles, cmd, qual, with_check FROM pg_policies ORDER BY schemaname, tablename, policyname `); return { content: [{ type: 'text', text: JSON.stringify(rlsPolicies.rows, null, 2) }] }; } else { // Show RLS status for specific table const rlsStatus = await this.queryClient.executeQuery(` SELECT schemaname, tablename, rowsecurity, forcerowsecurity FROM pg_tables WHERE tablename = $1 `, [table]); return { content: [{ type: 'text', text: JSON.stringify(rlsStatus.rows, null, 2) }] }; } case 'audit_log': const auditQuery = ` SELECT datname as database, usename as username, application_name, client_addr, backend_start, query_start, state, query FROM pg_stat_activity WHERE state = 'active' ORDER BY query_start DESC LIMIT 50 `; const auditInfo = await this.queryClient.executeQuery(auditQuery); return { content: [{ type: 'text', text: JSON.stringify(auditInfo.rows, null, 2) }] }; default: throw new Error(`Unknown security operation: ${operation}`); } } async run(): Promise<void> { const transport = new StdioServerTransport(); await this.server.connect(transport); logger.info(`PostgreSQL MCP Server running with ${toolDefinitions.length} tools`); } /** * Cleanup resources on shutdown */ async cleanup(): Promise<void> { try { // Cleanup cache this.cache.cleanup(); // Cleanup rate limiter this.rateLimiter.destroy(); // Cleanup database manager await this.dbManager.cleanup(); logger.info('Server resources cleaned up successfully'); } catch (error) { logger.error('Error during cleanup', { error: error instanceof Error ? error.message : error }); } } } // Export the class for external use export { PostgresMCPServer }; export default PostgresMCPServer; const skipRuntime = process.env.SKIP_CONFIG_VALIDATION === 'true' || process.env.CI === 'true'; let server: PostgresMCPServer | null = null; try { server = new PostgresMCPServer(); if (skipRuntime) { logger.info('Configuration missing or CI mode detected - skipping runtime server startup'); } else { // Graceful shutdown handling process.on('SIGINT', async () => { logger.info('Received SIGINT, shutting down'); if (server) await server.cleanup(); process.exit(0); }); process.on('SIGTERM', async () => { logger.info('Received SIGTERM, shutting down'); if (server) await server.cleanup(); process.exit(0); }); server.run().catch((error) => { logger.error('Server failed to start', error); process.exit(1); }); } } catch (e) { logger.error('Failed to initialize server', e); if (!skipRuntime) { process.exit(1); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/itsalfredakku/postgres-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server