Skip to main content
Glama
index.ts13.9 kB
import pg from "pg"; const { Pool } = pg; import { Connector, ConnectorType, ConnectorRegistry, DSNParser, SQLResult, TableColumn, TableIndex, StoredProcedure, ExecuteOptions, ConnectorConfig, } from "../interface.js"; import { SafeURL } from "../../utils/safe-url.js"; import { obfuscateDSNPassword } from "../../utils/dsn-obfuscate.js"; import { SQLRowLimiter } from "../../utils/sql-row-limiter.js"; /** * PostgreSQL DSN Parser * Handles DSN strings like: postgres://user:password@localhost:5432/dbname?sslmode=disable * Supported SSL modes: * - sslmode=disable: No SSL * - sslmode=require: SSL connection without certificate verification * - Any other value: SSL with certificate verification */ class PostgresDSNParser implements DSNParser { async parse(dsn: string, config?: ConnectorConfig): Promise<pg.PoolConfig> { const connectionTimeoutSeconds = config?.connectionTimeoutSeconds; // Basic validation if (!this.isValidDSN(dsn)) { const obfuscatedDSN = obfuscateDSNPassword(dsn); const expectedFormat = this.getSampleDSN(); throw new Error( `Invalid PostgreSQL DSN format.\nProvided: ${obfuscatedDSN}\nExpected: ${expectedFormat}` ); } try { // Use the SafeURL helper instead of the built-in URL // This will handle special characters in passwords, etc. const url = new SafeURL(dsn); const config: pg.PoolConfig = { host: url.hostname, port: url.port ? parseInt(url.port) : 5432, database: url.pathname ? url.pathname.substring(1) : '', // Remove leading '/' if exists user: url.username, password: url.password, }; // Handle query parameters (like sslmode, etc.) url.forEachSearchParam((value, key) => { if (key === "sslmode") { if (value === "disable") { config.ssl = false; } else if (value === "require") { config.ssl = { rejectUnauthorized: false }; } else { config.ssl = true; } } // Add other parameters as needed }); // Apply connection timeout if specified if (connectionTimeoutSeconds !== undefined) { // pg library expects timeout in milliseconds config.connectionTimeoutMillis = connectionTimeoutSeconds * 1000; } return config; } catch (error) { throw new Error( `Failed to parse PostgreSQL DSN: ${error instanceof Error ? error.message : String(error)}` ); } } getSampleDSN(): string { return "postgres://postgres:password@localhost:5432/postgres?sslmode=require"; } isValidDSN(dsn: string): boolean { try { return dsn.startsWith('postgres://') || dsn.startsWith('postgresql://'); } catch (error) { return false; } } } /** * PostgreSQL Connector Implementation */ export class PostgresConnector implements Connector { id: ConnectorType = "postgres"; name = "PostgreSQL"; dsnParser = new PostgresDSNParser(); private pool: pg.Pool | null = null; clone(): Connector { return new PostgresConnector(); } async connect(dsn: string, initScript?: string, config?: ConnectorConfig): Promise<void> { try { const poolConfig = await this.dsnParser.parse(dsn, config); // SDK-level readonly enforcement: Set default_transaction_read_only for the entire connection if (config?.readonly) { poolConfig.options = (poolConfig.options || '') + ' -c default_transaction_read_only=on'; } this.pool = new Pool(poolConfig); // Test the connection const client = await this.pool.connect(); console.error("Successfully connected to PostgreSQL database"); client.release(); } catch (err) { console.error("Failed to connect to PostgreSQL database:", err); throw err; } } async disconnect(): Promise<void> { if (this.pool) { await this.pool.end(); this.pool = null; } } async getSchemas(): Promise<string[]> { if (!this.pool) { throw new Error("Not connected to database"); } const client = await this.pool.connect(); try { const result = await client.query(` SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast') ORDER BY schema_name `); return result.rows.map((row) => row.schema_name); } finally { client.release(); } } async getTables(schema?: string): Promise<string[]> { if (!this.pool) { throw new Error("Not connected to database"); } const client = await this.pool.connect(); try { // In PostgreSQL, use 'public' as the default schema if none specified // 'public' is the standard default schema in PostgreSQL databases const schemaToUse = schema || "public"; const result = await client.query( ` SELECT table_name FROM information_schema.tables WHERE table_schema = $1 ORDER BY table_name `, [schemaToUse] ); return result.rows.map((row) => row.table_name); } finally { client.release(); } } async tableExists(tableName: string, schema?: string): Promise<boolean> { if (!this.pool) { throw new Error("Not connected to database"); } const client = await this.pool.connect(); try { // In PostgreSQL, use 'public' as the default schema if none specified const schemaToUse = schema || "public"; const result = await client.query( ` SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2 ) `, [schemaToUse, tableName] ); return result.rows[0].exists; } finally { client.release(); } } async getTableIndexes(tableName: string, schema?: string): Promise<TableIndex[]> { if (!this.pool) { throw new Error("Not connected to database"); } const client = await this.pool.connect(); try { // In PostgreSQL, use 'public' as the default schema if none specified const schemaToUse = schema || "public"; // Query to get all indexes for the table const result = await client.query( ` SELECT i.relname as index_name, array_agg(a.attname) as column_names, ix.indisunique as is_unique, ix.indisprimary as is_primary FROM pg_class t, pg_class i, pg_index ix, pg_attribute a, pg_namespace ns WHERE t.oid = ix.indrelid AND i.oid = ix.indexrelid AND a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) AND t.relkind = 'r' AND t.relname = $1 AND ns.oid = t.relnamespace AND ns.nspname = $2 GROUP BY i.relname, ix.indisunique, ix.indisprimary ORDER BY i.relname `, [tableName, schemaToUse] ); return result.rows.map((row) => ({ index_name: row.index_name, column_names: row.column_names, is_unique: row.is_unique, is_primary: row.is_primary, })); } finally { client.release(); } } async getTableSchema(tableName: string, schema?: string): Promise<TableColumn[]> { if (!this.pool) { throw new Error("Not connected to database"); } const client = await this.pool.connect(); try { // In PostgreSQL, use 'public' as the default schema if none specified // Tables are created in the 'public' schema by default unless otherwise specified const schemaToUse = schema || "public"; // Get table columns const result = await client.query( ` SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 ORDER BY ordinal_position `, [schemaToUse, tableName] ); return result.rows; } finally { client.release(); } } async getStoredProcedures(schema?: string): Promise<string[]> { if (!this.pool) { throw new Error("Not connected to database"); } const client = await this.pool.connect(); try { // In PostgreSQL, use 'public' as the default schema if none specified const schemaToUse = schema || "public"; // Get stored procedures and functions from PostgreSQL const result = await client.query( ` SELECT routine_name FROM information_schema.routines WHERE routine_schema = $1 ORDER BY routine_name `, [schemaToUse] ); return result.rows.map((row) => row.routine_name); } finally { client.release(); } } async getStoredProcedureDetail(procedureName: string, schema?: string): Promise<StoredProcedure> { if (!this.pool) { throw new Error("Not connected to database"); } const client = await this.pool.connect(); try { // In PostgreSQL, use 'public' as the default schema if none specified const schemaToUse = schema || "public"; // Get stored procedure details from PostgreSQL const result = await client.query( ` SELECT routine_name as procedure_name, routine_type, CASE WHEN routine_type = 'PROCEDURE' THEN 'procedure' ELSE 'function' END as procedure_type, external_language as language, data_type as return_type, routine_definition as definition, ( SELECT string_agg( parameter_name || ' ' || parameter_mode || ' ' || data_type, ', ' ) FROM information_schema.parameters WHERE specific_schema = $1 AND specific_name = $2 AND parameter_name IS NOT NULL ) as parameter_list FROM information_schema.routines WHERE routine_schema = $1 AND routine_name = $2 `, [schemaToUse, procedureName] ); if (result.rows.length === 0) { throw new Error(`Stored procedure '${procedureName}' not found in schema '${schemaToUse}'`); } const procedure = result.rows[0]; // If routine_definition is NULL, try to get the procedure body with pg_get_functiondef let definition = procedure.definition; try { // Get the OID for the procedure/function const oidResult = await client.query( ` SELECT p.oid, p.prosrc FROM pg_proc p JOIN pg_namespace n ON p.pronamespace = n.oid WHERE p.proname = $1 AND n.nspname = $2 `, [procedureName, schemaToUse] ); if (oidResult.rows.length > 0) { // If definition is still null, get the full definition if (!definition) { const oid = oidResult.rows[0].oid; const defResult = await client.query(`SELECT pg_get_functiondef($1)`, [oid]); if (defResult.rows.length > 0) { definition = defResult.rows[0].pg_get_functiondef; } else { // Fall back to prosrc if pg_get_functiondef fails definition = oidResult.rows[0].prosrc; } } } } catch (err) { // Ignore errors trying to get definition - it's optional console.error(`Error getting procedure definition: ${err}`); } return { procedure_name: procedure.procedure_name, procedure_type: procedure.procedure_type, language: procedure.language || "sql", parameter_list: procedure.parameter_list || "", return_type: procedure.return_type !== "void" ? procedure.return_type : undefined, definition: definition || undefined, }; } finally { client.release(); } } async executeSQL(sql: string, options: ExecuteOptions): Promise<SQLResult> { if (!this.pool) { throw new Error("Not connected to database"); } const client = await this.pool.connect(); try { // Check if this is a multi-statement query const statements = sql.split(';') .map(statement => statement.trim()) .filter(statement => statement.length > 0); if (statements.length === 1) { // Single statement - apply maxRows if applicable const processedStatement = SQLRowLimiter.applyMaxRows(statements[0], options.maxRows); return await client.query(processedStatement); } else { // Multiple statements - execute all in same session for transaction consistency let allRows: any[] = []; // Execute within a transaction to ensure session consistency await client.query('BEGIN'); try { for (let statement of statements) { // Apply maxRows limit to SELECT queries if specified const processedStatement = SQLRowLimiter.applyMaxRows(statement, options.maxRows); const result = await client.query(processedStatement); // Collect rows from SELECT/WITH/EXPLAIN statements if (result.rows && result.rows.length > 0) { allRows.push(...result.rows); } } await client.query('COMMIT'); } catch (error) { await client.query('ROLLBACK'); throw error; } return { rows: allRows }; } } finally { client.release(); } } } // Create and register the connector const postgresConnector = new PostgresConnector(); ConnectorRegistry.register(postgresConnector);

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bytebase/dbhub'

If you have feedback or need assistance with the MCP directory API, please join our Discord server