MCP PostgreSQL Server

by antonorlov
Verified
#!/usr/bin/env node import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { CallToolRequestSchema, ErrorCode, ListToolsRequestSchema, McpError, } from '@modelcontextprotocol/sdk/types.js'; import pg from 'pg'; const { Client } = pg; import { config } from 'dotenv'; // Load environment variables config(); interface DatabaseConfig { host: string; port: number; user: string; password: string; database: string; } // Type guard for error objects function isErrorWithMessage(error: unknown): error is { message: string } { return ( typeof error === 'object' && error !== null && 'message' in error && typeof (error as Record<string, unknown>).message === 'string' ); } // Helper to get error message function getErrorMessage(error: unknown): string { if (isErrorWithMessage(error)) { return error.message; } return String(error); } // Helper to convert ? parameters to $1, $2, etc. function convertToNamedParams(query: string): string { let paramIndex = 0; return query.replace(/\?/g, () => `$${++paramIndex}`); } class PostgresServer { private server: Server; private client: pg.Client | null = null; private config: DatabaseConfig | null = null; constructor() { this.server = new Server( { name: 'postgres-server', version: '1.0.0', }, { capabilities: { tools: {}, }, } ); this.setupToolHandlers(); // Error handling this.server.onerror = (error) => console.error('[MCP Error]', error); const handleTermination = async () => { try { await this.cleanup(); } catch (error) { console.error('Error during cleanup:', error); process.exit(1); } process.exit(0); }; process.on('SIGINT', handleTermination); process.stdin.on('close', handleTermination); } private async cleanup() { if (this.client) { await this.client.end(); } await this.server.close(); } private async ensureConnection() { if (!this.config) { // Try to use environment variables if no explicit config was provided const envConfig = this.getEnvConfig(); if (envConfig) { this.config = envConfig; console.error('[MCP Info] Using database config from environment variables'); } else { throw new McpError( ErrorCode.InvalidRequest, 'Database configuration not set. Use connect_db tool first or set environment variables.' ); } } if (!this.client) { try { this.client = new Client(this.config); await this.client.connect(); } catch (error) { throw new McpError( ErrorCode.InternalError, `Failed to connect to database: ${getErrorMessage(error)}` ); } } } private getEnvConfig(): DatabaseConfig | null { const { PG_HOST, PG_USER, PG_PASSWORD, PG_DATABASE, PG_PORT } = process.env; if (PG_HOST && PG_USER && PG_PASSWORD && PG_DATABASE) { return { host: PG_HOST, port: PG_PORT ? parseInt(PG_PORT, 10) : 5432, user: PG_USER, password: PG_PASSWORD, database: PG_DATABASE }; } return null; } private setupToolHandlers() { this.server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [ { name: 'connect_db', description: 'Connect to PostgreSQL database. NOTE: Default connection exists - only use when requested or if other commands fail', inputSchema: { type: 'object', properties: { host: { type: 'string', description: 'Database host', }, port: { type: 'number', description: 'Database port (default: 5432)', }, user: { type: 'string', description: 'Database user', }, password: { type: 'string', description: 'Database password', }, database: { type: 'string', description: 'Database name', }, }, required: ['host', 'user', 'password', 'database'], }, }, { name: 'query', description: 'Execute a SELECT query', inputSchema: { type: 'object', properties: { sql: { type: 'string', description: 'SQL SELECT query (use $1, $2, etc. for parameters)', }, params: { type: 'array', items: { type: ['string', 'number', 'boolean', 'null'], }, description: 'Query parameters (optional)', }, }, required: ['sql'], }, }, { name: 'execute', description: 'Execute an INSERT, UPDATE, or DELETE query', inputSchema: { type: 'object', properties: { sql: { type: 'string', description: 'SQL query (INSERT, UPDATE, DELETE) (use $1, $2, etc. for parameters)', }, params: { type: 'array', items: { type: ['string', 'number', 'boolean', 'null'], }, description: 'Query parameters (optional)', }, }, required: ['sql'], }, }, { name: 'list_tables', description: 'List all tables in the database', inputSchema: { type: 'object', properties: {}, required: [], }, }, { name: 'describe_table', description: 'Get table structure', inputSchema: { type: 'object', properties: { table: { type: 'string', description: 'Table name', }, }, required: ['table'], }, }, ], })); this.server.setRequestHandler(CallToolRequestSchema, async (request) => { switch (request.params.name) { case 'connect_db': return await this.handleConnectDb(request.params.arguments); case 'query': return await this.handleQuery(request.params.arguments); case 'execute': return await this.handleExecute(request.params.arguments); case 'list_tables': return await this.handleListTables(); case 'describe_table': return await this.handleDescribeTable(request.params.arguments); default: throw new McpError( ErrorCode.MethodNotFound, `Unknown tool: ${request.params.name}` ); } }); } private async handleConnectDb(args: any) { if (!args.host || !args.user || !args.password || !args.database) { throw new McpError( ErrorCode.InvalidParams, 'Missing required database configuration parameters' ); } // Close existing connection if any if (this.client) { await this.client.end(); this.client = null; } this.config = { host: args.host, port: args.port || 5432, user: args.user, password: args.password, database: args.database, }; try { await this.ensureConnection(); return { content: [ { type: 'text', text: 'Successfully connected to PostgreSQL database', }, ], }; } catch (error) { throw new McpError( ErrorCode.InternalError, `Failed to connect to database: ${getErrorMessage(error)}` ); } } private async handleQuery(args: any) { await this.ensureConnection(); if (!args.sql) { throw new McpError(ErrorCode.InvalidParams, 'SQL query is required'); } if (!args.sql.trim().toUpperCase().startsWith('SELECT')) { throw new McpError( ErrorCode.InvalidParams, 'Only SELECT queries are allowed with query tool' ); } try { // Convert ? parameters to $1, $2, etc. if needed const sql = args.sql.includes('?') ? convertToNamedParams(args.sql) : args.sql; const result = await this.client!.query(sql, args.params || []); return { content: [ { type: 'text', text: JSON.stringify(result.rows, null, 2), }, ], }; } catch (error) { throw new McpError( ErrorCode.InternalError, `Query execution failed: ${getErrorMessage(error)}` ); } } private async handleExecute(args: any) { await this.ensureConnection(); if (!args.sql) { throw new McpError(ErrorCode.InvalidParams, 'SQL query is required'); } const sql = args.sql.trim().toUpperCase(); if (sql.startsWith('SELECT')) { throw new McpError( ErrorCode.InvalidParams, 'Use query tool for SELECT statements' ); } try { // Convert ? parameters to $1, $2, etc. if needed const preparedSql = args.sql.includes('?') ? convertToNamedParams(args.sql) : args.sql; const result = await this.client!.query(preparedSql, args.params || []); return { content: [ { type: 'text', text: JSON.stringify({ rowCount: result.rowCount, command: result.command, }, null, 2), }, ], }; } catch (error) { throw new McpError( ErrorCode.InternalError, `Query execution failed: ${getErrorMessage(error)}` ); } } private async handleListTables() { await this.ensureConnection(); try { const result = await this.client!.query(` SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name `); return { content: [ { type: 'text', text: JSON.stringify(result.rows, null, 2), }, ], }; } catch (error) { throw new McpError( ErrorCode.InternalError, `Failed to list tables: ${getErrorMessage(error)}` ); } } private async handleDescribeTable(args: any) { await this.ensureConnection(); if (!args.table) { throw new McpError(ErrorCode.InvalidParams, 'Table name is required'); } try { const result = await this.client!.query(` SELECT c.column_name, c.data_type, c.is_nullable, c.column_default, CASE WHEN pk.constraint_type = 'PRIMARY KEY' THEN true ELSE false END AS is_primary_key, c.character_maximum_length FROM information_schema.columns c LEFT JOIN ( SELECT tc.constraint_type, kcu.column_name, kcu.table_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name WHERE tc.constraint_type = 'PRIMARY KEY' ) pk ON c.column_name = pk.column_name AND c.table_name = pk.table_name WHERE c.table_schema = 'public' AND c.table_name = $1 ORDER BY c.ordinal_position `, [args.table]); return { content: [ { type: 'text', text: JSON.stringify(result.rows, null, 2), }, ], }; } catch (error) { throw new McpError( ErrorCode.InternalError, `Failed to describe table: ${getErrorMessage(error)}` ); } } async run() { const transport = new StdioServerTransport(); await this.server.connect(transport); console.error('PostgreSQL MCP server running on stdio'); } } const server = new PostgresServer(); server.run().catch(console.error);