index.ts•38.3 kB
#!/usr/bin/env node
import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { Pool } from "pg";
import { z } from "zod";
const server = new McpServer({
name: "mcp-server-postgres-multitenant",
version: "1.0.0",
});
const args = process.argv.slice(2);
if (args.length === 0) {
process.stderr.write(`
MCP PostgreSQL Server - Missing Database URL
Error: No PostgreSQL connection URL provided.
Usage: npx @ahmetkca/mcp-server-postgres "postgresql://user:password@host:port/database"
Examples:
npx @ahmetkca/mcp-server-postgres "postgresql://postgres:password@localhost:5432/mydb"
npx @ahmetkca/mcp-server-postgres "postgresql://user:pass@example.com:5432/production_db"
Environment variables:
You can also use: npx @ahmetkca/mcp-server-postgres "$DATABASE_URL"
For AI agents: This MCP server requires a valid PostgreSQL connection URL as the first argument.
Without it, the server cannot access database schemas, tables, or functions.
`);
process.exit(1);
}
const databaseUrl = args[0];
if (!databaseUrl.startsWith('postgresql://') && !databaseUrl.startsWith('postgres://')) {
process.stderr.write(`
MCP PostgreSQL Server - Invalid Database URL Format
Error: Database URL must start with 'postgresql://' or 'postgres://'
Provided: ${databaseUrl.substring(0, 50)}...
Correct format: postgresql://[user[:password]@][host][:port][/database][?param1=value1&...]
Examples:
postgresql://postgres:password@localhost:5432/mydb
postgresql://user@localhost/database
postgres://user:pass@example.com:5432/db?sslmode=require
For AI agents: The connection URL format is critical for PostgreSQL connectivity.
Ensure it follows the standard PostgreSQL connection string format.
`);
process.exit(1);
}
const pool = new Pool({
connectionString: databaseUrl,
});
async function testDatabaseConnection(): Promise<void> {
let client;
try {
client = await pool.connect();
await client.query("SELECT version(), current_database(), current_user");
await client.query("SELECT COUNT(*) FROM information_schema.schemata");
await client.query("BEGIN TRANSACTION READ ONLY");
await client.query("ROLLBACK");
} catch (error: any) {
let errorTitle = "Database Connection Failed";
let errorMessage = "Failed to connect to PostgreSQL database.";
let suggestion = "";
if (error.code === 'ECONNREFUSED') {
errorTitle = "Connection Refused";
errorMessage = "PostgreSQL server is not running or not accepting connections.";
suggestion = "Verify PostgreSQL server is running and check host/port configuration.";
} else if (error.code === 'ENOTFOUND') {
errorTitle = "Hostname Not Found";
errorMessage = "DNS resolution failed for the database hostname.";
suggestion = "Verify hostname in connection URL is correct and accessible.";
} else if (error.code === '28P01') {
errorTitle = "Authentication Failed";
errorMessage = "Invalid username or password for database connection.";
suggestion = "Verify username and password are correct and user has connection privileges.";
} else if (error.code === '3D000') {
errorTitle = "Database Does Not Exist";
errorMessage = "The specified database does not exist on the server.";
suggestion = "Create the database or verify the database name in your connection URL.";
} else if (error.code === '28000') {
errorTitle = "Authorization Failed";
errorMessage = "Invalid authorization specification or connection parameters.";
suggestion = "Check connection parameters and PostgreSQL pg_hba.conf configuration.";
} else if (error.code === '42501') {
errorTitle = "Permission Denied";
errorMessage = "Insufficient database privileges for the connecting user.";
suggestion = "Grant necessary permissions and ensure user can access information_schema tables.";
} else if (error.message?.includes('timeout')) {
errorTitle = "Connection Timeout";
errorMessage = "Database server took too long to respond.";
suggestion = "Check network connectivity and server performance.";
} else {
errorTitle = "Database Error";
errorMessage = `Database connection error: ${error.message}`;
suggestion = "Check PostgreSQL server logs and connection URL format.";
}
const sanitizedUrl = databaseUrl.replace(/:\/\/[^@]*@/, '://***:***@');
const errorDetails = `
MCP PostgreSQL Server - ${errorTitle}
Error: ${errorMessage}
Code: ${error.code || 'Unknown'}
Connection: ${sanitizedUrl}
Suggestion: ${suggestion}
For AI agents: This MCP server requires a valid PostgreSQL connection to function.
The server cannot start without database access as all tools and resources depend on it.
`;
process.stderr.write(errorDetails);
process.exit(1);
} finally {
if (client) {
client.release();
}
}
}
await testDatabaseConnection();
server.registerResource(
"table-structure",
new ResourceTemplate("pg-table://{schemaName}/{tableName}/structure", {
list: async () => {
const client = await pool.connect();
try {
const result = await client.query(`
SELECT DISTINCT ON (t.table_schema, t.table_name)
t.table_schema AS schema_name,
t.table_name AS table_name,
COALESCE(
obj_description((quote_ident(t.table_schema)||'.'||quote_ident(t.table_name))::regclass::oid, 'pg_class'),
''
) AS table_comment
FROM information_schema.tables t
WHERE t.table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
AND t.table_type = 'BASE TABLE'
ORDER BY t.table_schema, t.table_name
`);
const seen = new Set<string>();
const resources = result.rows
.map((row) => {
const schemaEnc = encodeURIComponent(row.schema_name);
const tableEnc = encodeURIComponent(row.table_name);
const uri = `pg-table://${schemaEnc}/${tableEnc}/structure`;
return {
name: `${row.schema_name}.${row.table_name}`,
uri,
title: `Structure for ${row.schema_name}.${row.table_name}`,
description: row.table_comment || `Database table structure for ${row.schema_name}.${row.table_name}`,
mimeType: "application/json",
};
})
// Final guard against any duplicates
.filter((res) => (seen.has(res.uri) ? false : (seen.add(res.uri), true)));
return {
resources: resources
};
} finally {
client.release();
}
},
complete: {
schemaName: async (value?: string) => {
const client = await pool.connect();
try {
const result = await client.query(
`
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('information_schema','pg_catalog','pg_toast')
AND schema_name ILIKE $1
ORDER BY schema_name
LIMIT 50
`,
[((value ?? "").trim() || "") + "%"]
);
return result.rows.map((r) => r.schema_name as string);
} finally {
client.release();
}
},
tableName: async (value: string | undefined, context?: { arguments?: Record<string, unknown> }) => {
const schema = (context?.arguments?.["schemaName"] as string | undefined)?.trim();
if (!schema) return [];
const client = await pool.connect();
try {
const result = await client.query(
`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1
AND table_type = 'BASE TABLE'
AND table_name ILIKE $2
ORDER BY table_name
LIMIT 100
`,
[schema, ((value ?? "").trim() || "") + "%"]
);
return result.rows.map((r) => r.table_name as string);
} finally {
client.release();
}
},
},
}),
{
title: "Multi-Tenant Table Structure",
description: "Get the comprehensive table structure for a database table including columns, constraints, indexes, and relationships across different tenant schemas",
},
async (uri, { schemaName, tableName }) => {
const client = await pool.connect();
try {
const schemaQuery = `
WITH column_info AS (
SELECT
c.column_name,
c.data_type,
c.character_maximum_length,
c.numeric_precision,
c.numeric_scale,
c.is_nullable,
c.column_default,
c.ordinal_position,
COALESCE(col_description(pgc.oid, c.ordinal_position), '') as column_comment
FROM information_schema.columns c
LEFT JOIN pg_class pgc ON pgc.relname = c.table_name
LEFT JOIN pg_namespace pgn ON pgn.oid = pgc.relnamespace
WHERE c.table_name = $1
AND c.table_schema = $2
AND pgn.nspname = $2
ORDER BY c.ordinal_position
),
constraints_info AS (
SELECT
tc.constraint_name,
tc.constraint_type,
string_agg(kcu.column_name, ', ' ORDER BY kcu.ordinal_position) as column_names,
ccu.table_schema as foreign_table_schema,
ccu.table_name as foreign_table_name,
ccu.column_name as foreign_column_name
FROM information_schema.table_constraints tc
LEFT JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
LEFT JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
AND tc.table_schema = ccu.table_schema
WHERE tc.table_name = $1
AND tc.table_schema = $2
GROUP BY tc.constraint_name, tc.constraint_type, ccu.table_schema, ccu.table_name, ccu.column_name
),
indexes_info AS (
SELECT
i.relname as index_name,
ix.indisunique as is_unique,
ix.indisprimary as is_primary,
array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as column_names,
pg_get_indexdef(i.oid) as index_definition
FROM pg_class t
JOIN pg_namespace n ON n.oid = t.relnamespace
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
WHERE t.relname = $1
AND n.nspname = $2
GROUP BY i.relname, ix.indisunique, ix.indisprimary, i.oid
),
table_ddl AS (
SELECT
'CREATE TABLE ' || $2 || '.' || $1 || ' (' || E'\n' ||
string_agg(
' ' || column_name || ' ' ||
CASE
WHEN data_type = 'character varying' THEN 'VARCHAR(' || character_maximum_length || ')'
WHEN data_type = 'character' THEN 'CHAR(' || character_maximum_length || ')'
WHEN data_type = 'numeric' THEN 'NUMERIC(' || numeric_precision || ',' || numeric_scale || ')'
WHEN data_type = 'timestamp without time zone' THEN 'TIMESTAMP'
WHEN data_type = 'timestamp with time zone' THEN 'TIMESTAMP WITH TIME ZONE'
ELSE UPPER(data_type)
END ||
CASE WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END ||
CASE WHEN column_default IS NOT NULL THEN ' DEFAULT ' || column_default ELSE '' END,
',' || E'\n' ORDER BY ordinal_position
) || E'\n' || ');' as create_table_ddl
FROM column_info
)
SELECT
json_build_object(
'schema_name', $2,
'table_name', $1,
'columns', (
SELECT json_agg(
json_build_object(
'name', column_name,
'data_type', data_type,
'max_length', character_maximum_length,
'precision', numeric_precision,
'scale', numeric_scale,
'nullable', is_nullable = 'YES',
'default_value', column_default,
'position', ordinal_position,
'comment', column_comment
) ORDER BY ordinal_position
)
FROM column_info
),
'constraints', (
SELECT json_agg(
json_build_object(
'name', constraint_name,
'type', constraint_type,
'columns', string_to_array(column_names, ', '),
'foreign_table_schema', foreign_table_schema,
'foreign_table_name', foreign_table_name,
'foreign_column_name', foreign_column_name
)
)
FROM constraints_info
),
'indexes', (
SELECT json_agg(
json_build_object(
'name', index_name,
'unique', is_unique,
'primary', is_primary,
'columns', column_names,
'definition', index_definition
)
)
FROM indexes_info
),
'ddl', (
SELECT json_build_object(
'create_table', create_table_ddl,
'indexes', (
SELECT json_agg(index_definition ORDER BY index_name)
FROM indexes_info
WHERE NOT is_primary
),
'table_comment', obj_description(
(SELECT oid FROM pg_class WHERE relname = $1 AND relnamespace = (
SELECT oid FROM pg_namespace WHERE nspname = $2
)), 'pg_class'
)
)
FROM table_ddl
)
) as table_schema;
`;
const result = await client.query(schemaQuery, [tableName, schemaName]);
if (result.rows.length === 0) {
throw new Error(`Table '${schemaName}.${tableName}' not found`);
}
const tableSchema = result.rows[0].table_schema;
return {
contents: [
{
uri: uri.href,
mimeType: "application/json",
text: JSON.stringify(tableSchema, null, 2),
},
],
};
} finally {
client.release();
}
}
);
server.registerResource(
"function-definition",
new ResourceTemplate("pg-func://{schemaName}/{functionName}/{identityArgs}/definition", {
list: async () => {
const client = await pool.connect();
try {
const result = await client.query(`
SELECT DISTINCT ON (n.nspname, p.proname, pg_get_function_identity_arguments(p.oid))
n.nspname AS schema_name,
p.proname AS function_name,
pg_get_function_identity_arguments(p.oid) AS identity_args,
obj_description(p.oid) AS description,
l.lanname AS language,
CASE p.provolatile
WHEN 'i' THEN 'IMMUTABLE'
WHEN 's' THEN 'STABLE'
WHEN 'v' THEN 'VOLATILE'
END AS volatility,
pg_get_function_result(p.oid) AS return_type
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
JOIN pg_language l ON p.prolang = l.oid
WHERE n.nspname NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
AND n.nspname NOT LIKE 'pg_%'
ORDER BY n.nspname, p.proname, pg_get_function_identity_arguments(p.oid)
`);
const seen = new Set<string>();
const resources = result.rows
.map((row) => {
const schemaEnc = encodeURIComponent(row.schema_name);
const funcEnc = encodeURIComponent(row.function_name);
const argsDisplay = `(${row.identity_args})`;
const argsEnc = encodeURIComponent(argsDisplay);
const uri = `pg-func://${schemaEnc}/${funcEnc}/${argsEnc}/definition`;
return {
name: `${row.schema_name}.${row.function_name}${argsDisplay}`,
uri,
title: `Definition for ${row.schema_name}.${row.function_name}${argsDisplay}`,
description: row.description || `${row.language} function returning ${row.return_type} (${row.volatility})`,
mimeType: "application/json",
};
})
// Guard against any accidental duplicates
.filter((res) => (seen.has(res.uri) ? false : (seen.add(res.uri), true)));
return {
resources: resources
};
} finally {
client.release();
}
},
complete: {
schemaName: async (value?: string) => {
const client = await pool.connect();
try {
const result = await client.query(
`
SELECT DISTINCT nspname AS schema_name
FROM pg_namespace
WHERE nspname NOT IN ('information_schema','pg_catalog','pg_toast')
AND nspname NOT LIKE 'pg_%'
AND nspname ILIKE $1
ORDER BY nspname
LIMIT 50
`,
[((value ?? "").trim() || "") + "%"]
);
return result.rows.map((r) => r.schema_name as string);
} finally {
client.release();
}
},
functionName: async (value: string | undefined, context?: { arguments?: Record<string, unknown> }) => {
const schema = (context?.arguments?.["schemaName"] as string | undefined)?.trim();
if (!schema) return [];
const client = await pool.connect();
try {
const result = await client.query(
`
SELECT DISTINCT p.proname AS function_name
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE n.nspname = $1
AND p.proname ILIKE $2
ORDER BY p.proname
LIMIT 100
`,
[schema, ((value ?? "").trim() || "") + "%"]
);
return result.rows.map((r) => r.function_name as string);
} finally {
client.release();
}
},
identityArgs: async (value: string | undefined, context?: { arguments?: Record<string, unknown> }) => {
const schema = (context?.arguments?.["schemaName"] as string | undefined)?.trim();
const fn = (context?.arguments?.["functionName"] as string | undefined)?.trim();
if (!schema || !fn) return [];
const partial = (value ?? "").trim().replace(/^[\(\s]*/, "").replace(/[\s\)]*$/, "");
const client = await pool.connect();
try {
const result = await client.query(
`
SELECT '(' || pg_get_function_identity_arguments(p.oid) || ')' AS identity_args
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE n.nspname = $1
AND p.proname = $2
AND pg_get_function_identity_arguments(p.oid) ILIKE $3
ORDER BY pg_get_function_identity_arguments(p.oid)
LIMIT 100
`,
[schema, fn, (partial || "") + "%"]
);
return result.rows.map((r) => r.identity_args as string);
} finally {
client.release();
}
},
},
}),
{
title: "Multi-Tenant Function Definition",
description: "Get the comprehensive function definition including parameters, return type, source code, and metadata across different tenant schemas",
},
async (uri, { schemaName, functionName, identityArgs }) => {
const client = await pool.connect();
try {
// Resolve exact function OID from identity args for disambiguation of overloads
const oidQuery = `
SELECT (
'"' || $1 || '".' || quote_ident($2) || $3
)::regprocedure::oid AS fn_oid
`;
// identityArgs arrives decoded, including surrounding parentheses e.g. "(integer, integer)"
const oidResult = await client.query(oidQuery, [schemaName, functionName, identityArgs]);
if (oidResult.rows.length === 0 || !oidResult.rows[0].fn_oid) {
throw new Error(`Function '${schemaName}.${functionName}${identityArgs}' not found`);
}
const fnOid = oidResult.rows[0].fn_oid;
// Get comprehensive function definition information for the exact overload
const functionQuery = `
WITH function_info AS (
SELECT
n.nspname as schema_name,
p.proname as function_name,
p.proargnames as parameter_names,
pg_get_function_arguments(p.oid) as full_signature,
pg_get_function_result(p.oid) as return_type,
pg_get_functiondef(p.oid) as source_code,
l.lanname as language,
CASE p.provolatile
WHEN 'i' THEN 'IMMUTABLE'
WHEN 's' THEN 'STABLE'
WHEN 'v' THEN 'VOLATILE'
END as volatility,
CASE WHEN p.prosecdef THEN 'DEFINER' ELSE 'INVOKER' END as security,
obj_description(p.oid) as description,
p.pronargs as num_args,
p.proretset as returns_set,
t.typname as return_type_name,
t.typtype as return_type_category
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
JOIN pg_language l ON p.prolang = l.oid
LEFT JOIN pg_type t ON p.prorettype = t.oid
WHERE p.oid = $1
),
parameter_details AS (
SELECT
p.proname,
unnest(p.proargnames) as param_name,
unnest(p.proargtypes::oid[]) as param_type_oid,
generate_series(1, p.pronargs) as param_position
FROM pg_proc p
WHERE p.oid = $1
),
param_info AS (
SELECT
pd.proname,
json_agg(
json_build_object(
'name', pd.param_name,
'type', pt.typname,
'position', pd.param_position
) ORDER BY pd.param_position
) as parameters
FROM parameter_details pd
LEFT JOIN pg_type pt ON pd.param_type_oid = pt.oid
GROUP BY pd.proname
)
SELECT
json_build_object(
'schema_name', fi.schema_name,
'function_name', fi.function_name,
'parameters', COALESCE(pi.parameters, '[]'::json),
'full_signature', fi.full_signature,
'return_type', fi.return_type,
'is_table_function', fi.returns_set,
'language', fi.language,
'volatility', fi.volatility,
'security', fi.security,
'description', fi.description,
'num_parameters', fi.num_args,
'source_code', fi.source_code,
'ddl', json_build_object(
'create_function', fi.source_code
)
) as function_definition
FROM function_info fi
LEFT JOIN param_info pi ON fi.function_name = pi.proname;
`;
const result = await client.query(functionQuery, [fnOid]);
if (result.rows.length === 0) {
throw new Error(`Function '${schemaName}.${functionName}${identityArgs}' not found`);
}
const functionDefinition = result.rows[0].function_definition;
// If it's a table function, get the table column definitions
if (functionDefinition.is_table_function) {
const tableColumnsQuery = `
SELECT
json_agg(
json_build_object(
'name', column_name,
'type', data_type,
'position', ordinal_position
) ORDER BY ordinal_position
) as table_columns
FROM information_schema.routine_columns
WHERE routine_schema = $1 AND routine_name = $2
`;
const columnsResult = await client.query(tableColumnsQuery, [schemaName, functionName]);
if (columnsResult.rows.length > 0 && columnsResult.rows[0].table_columns) {
functionDefinition.table_columns = columnsResult.rows[0].table_columns;
}
}
return {
contents: [
{
uri: uri.href,
mimeType: "application/json",
text: JSON.stringify(functionDefinition, null, 2),
},
],
};
} finally {
client.release();
}
}
);
server.registerResource(
"schema-overview",
new ResourceTemplate("pg-schema://{schemaName}/overview", {
list: async () => {
const client = await pool.connect();
try {
const result = await client.query(
`
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('information_schema','pg_catalog','pg_toast')
ORDER BY schema_name
`
);
const resources = result.rows.map((row) => {
const schemaEnc = encodeURIComponent(row.schema_name);
const uri = `pg-schema://${schemaEnc}/overview`;
return {
name: row.schema_name,
uri,
title: `Schema Overview: ${row.schema_name}`,
description: `High-level overview and quick links for schema '${row.schema_name}'`,
mimeType: "application/json",
};
});
return { resources };
} finally {
client.release();
}
},
complete: {
schemaName: async (value?: string) => {
const client = await pool.connect();
try {
const result = await client.query(
`
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('information_schema','pg_catalog','pg_toast')
AND schema_name ILIKE $1
ORDER BY schema_name
LIMIT 50
`,
[((value ?? "").trim() || "") + "%"]
);
return result.rows.map((r) => r.schema_name as string);
} finally {
client.release();
}
},
},
}),
{
title: "Schema Overview",
description: "High-level overview for a specific schema including counts and quick links to tables and functions",
},
async (uri, { schemaName }) => {
const client = await pool.connect();
try {
// Basic counts
const countsQuery = `
WITH
t AS (
SELECT COUNT(*)::int AS c
FROM information_schema.tables
WHERE table_schema = $1 AND table_type = 'BASE TABLE'
),
v AS (
SELECT COUNT(*)::int AS c
FROM information_schema.views
WHERE table_schema = $1
),
r AS (
SELECT COUNT(*)::int AS c
FROM information_schema.routines
WHERE routine_schema = $1
)
SELECT
(SELECT c FROM t) AS table_count,
(SELECT c FROM v) AS view_count,
(SELECT c FROM r) AS function_count
`;
const counts = await client.query(countsQuery, [schemaName]);
// Sample tables with resource URIs
const tablesRes = await client.query(
`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1 AND table_type = 'BASE TABLE'
ORDER BY table_name
LIMIT 100
`,
[schemaName]
);
const tableItems = tablesRes.rows.map((r) => {
const schemaEnc = encodeURIComponent(String(schemaName));
const tableEnc = encodeURIComponent(r.table_name);
return {
name: r.table_name,
uri: `pg-table://${schemaEnc}/${tableEnc}/structure`,
};
});
// Sample function overloads with resource URIs
const funcsRes = await client.query(
`
SELECT p.proname AS function_name,
'(' || pg_get_function_identity_arguments(p.oid) || ')' AS identity_args
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE n.nspname = $1
ORDER BY p.proname, pg_get_function_identity_arguments(p.oid)
LIMIT 100
`,
[schemaName]
);
const funcItems = funcsRes.rows.map((r) => {
const schemaEnc = encodeURIComponent(String(schemaName));
const funcEnc = encodeURIComponent(r.function_name);
const argsEnc = encodeURIComponent(r.identity_args);
return {
name: `${r.function_name}${r.identity_args}`,
uri: `pg-func://${schemaEnc}/${funcEnc}/${argsEnc}/definition`,
};
});
const overview = {
schema_name: schemaName,
uri: uri.href,
statistics: {
tables: counts.rows[0]?.table_count ?? 0,
views: counts.rows[0]?.view_count ?? 0,
functions: counts.rows[0]?.function_count ?? 0,
},
tables: {
first_100: tableItems,
link_hint: "Use pg-table://{schemaName}/{tableName}/structure for full details",
},
functions: {
first_100: funcItems,
link_hint: "Use pg-func://{schemaName}/{functionName}/{identityArgs}/definition for full details",
},
};
return {
contents: [
{
uri: uri.href,
mimeType: "application/json",
text: JSON.stringify(overview, null, 2),
},
],
};
} finally {
client.release();
}
}
);
// Register a tool for running read-only SQL queries with multi-tenant support
server.registerTool(
"query",
{
title: "Multi-Tenant Database Query Tool",
description: "Execute read-only SQL queries to retrieve, analyze, and explore PostgreSQL data across multiple tenant schemas. Supports complex SQL including JOINs, CTEs, window functions, and aggregations. All queries run in read-only transactions for safety. Use 'schema' parameter to set tenant context, or 'explain' to analyze query performance. Ideal for data analysis, reporting, debugging, and cross-tenant comparisons.",
inputSchema: {
sql: z.string().describe("The SQL query to execute. Can be any valid PostgreSQL SELECT statement including complex queries with JOINs, CTEs, window functions, aggregations, etc. Will be executed in a read-only transaction for safety."),
schema: z.string().optional().describe("Optional schema name (tenant) to set as search_path before executing the query. When specified, unqualified table names will resolve to tables in this schema. Use this for tenant-specific queries."),
explain: z.boolean().optional().default(false).describe("Set to true to return the query execution plan instead of query results. Useful for performance analysis and optimization. Returns PostgreSQL EXPLAIN output in JSON format."),
},
},
async ({ sql, schema, explain }) => {
const client = await pool.connect();
try {
// Start a read-only transaction for safety
await client.query("BEGIN TRANSACTION READ ONLY");
// Set search_path if schema is specified for multi-tenant support
if (schema) {
await client.query(`SET search_path TO ${client.escapeIdentifier(schema)}, public`);
}
// Prepare the query - add EXPLAIN if requested
const finalQuery = explain ? `EXPLAIN (FORMAT JSON, ANALYZE false) ${sql}` : sql;
const result = await client.query(finalQuery);
// Format the response based on whether it's an explain or regular query
const responseContent = explain && result.rows[0] && result.rows[0]['QUERY PLAN']
? {
query_plan: result.rows[0]['QUERY PLAN'],
original_query: sql,
schema_context: schema || 'default'
}
: {
data: result.rows,
row_count: result.rowCount,
schema_context: schema || 'default'
};
return {
content: [
{
type: "text",
text: JSON.stringify(responseContent, null, 2),
},
],
};
} catch (error: any) {
// Ensure errors are properly returned as MCP tool call errors
return {
isError: true,
content: [
{
type: "text",
text: `Error in ${schema ? `schema '${schema}'` : 'default context'}: ${error.message}`,
},
],
};
} finally {
// Always end the transaction and release the client
await client.query("ROLLBACK");
client.release();
}
}
);
// Register a tool for listing available schemas in the database
server.registerTool(
"list-schemas",
{
title: "List Database Schemas",
description: "Discover all available database schemas (tenants) with statistics including table and function counts. Use this to explore the multi-tenant structure, identify available tenants, or get an overview of database organization. Optionally include system schemas for administrative purposes.",
inputSchema: {
include_system: z.boolean().optional().default(false).describe("Set to true to include PostgreSQL system schemas (information_schema, pg_catalog, pg_toast) in the results. Default false shows only user/tenant schemas. Use true for administrative or debugging purposes."),
},
},
async ({ include_system }) => {
const client = await pool.connect();
try {
const systemSchemaFilter = include_system
? ""
: "WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast')";
const query = `
SELECT
schema_name,
schema_owner,
(SELECT COUNT(*) FROM information_schema.tables t WHERE t.table_schema = s.schema_name) as table_count,
(SELECT COUNT(*) FROM information_schema.routines r WHERE r.routine_schema = s.schema_name) as function_count
FROM information_schema.schemata s
${systemSchemaFilter}
ORDER BY schema_name
`;
const result = await client.query(query);
return {
content: [
{
type: "text",
text: JSON.stringify({
schemas: result.rows,
total_count: result.rowCount
}, null, 2),
},
],
};
} catch (error: any) {
return {
isError: true,
content: [
{
type: "text",
text: error.message,
},
],
};
} finally {
client.release();
}
}
);
// Register a tool for getting detailed schema information
server.registerTool(
"describe-schema",
{
title: "Describe Schema",
description: "Get comprehensive information about a specific database schema (tenant) including detailed statistics, all tables, views, functions, and custom types. Use this to understand a tenant's database structure, analyze schema composition, or prepare for schema-specific operations. Returns organized metadata perfect for schema analysis and documentation.",
inputSchema: {
schema_name: z.string().describe("Name of the database schema (tenant) to analyze. Must be an exact schema name from the database. Use list-schemas tool first to discover available schema names."),
},
},
async ({ schema_name }) => {
const client = await pool.connect();
try {
const query = `
WITH schema_stats AS (
SELECT
$1::text as schema_name,
(SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = $1::text) as table_count,
(SELECT COUNT(*) FROM information_schema.views WHERE table_schema = $1::text) as view_count,
(SELECT COUNT(*) FROM information_schema.routines WHERE routine_schema = $1::text) as function_count,
(SELECT COUNT(*) FROM information_schema.user_defined_types WHERE user_defined_type_schema = $1::text) as type_count
),
tables_info AS (
SELECT
table_name,
table_type,
(SELECT COUNT(*) FROM information_schema.columns c WHERE c.table_schema = $1::text AND c.table_name = t.table_name) as column_count
FROM information_schema.tables t
WHERE table_schema = $1::text
ORDER BY table_name
),
functions_info AS (
SELECT
routine_name,
routine_type,
data_type as return_type
FROM information_schema.routines
WHERE routine_schema = $1::text
ORDER BY routine_name
)
SELECT
json_build_object(
'schema_name', schema_name,
'statistics', json_build_object(
'tables', table_count,
'views', view_count,
'functions', function_count,
'types', type_count
),
'tables', (SELECT json_agg(tables_info) FROM tables_info),
'functions', (SELECT json_agg(functions_info) FROM functions_info)
) as schema_info
FROM schema_stats;
`;
const result = await client.query(query, [schema_name]);
if (result.rows.length === 0) {
throw new Error(`Schema '${schema_name}' not found`);
}
return {
content: [
{
type: "text",
text: JSON.stringify(result.rows[0].schema_info, null, 2),
},
],
};
} catch (error: any) {
return {
isError: true,
content: [
{
type: "text",
text: error.message,
},
],
};
} finally {
client.release();
}
}
);
async function runServer() {
const transport = new StdioServerTransport();
await server.connect(transport);
}
runServer().catch(console.error);