import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ListToolsRequestSchema,
ListResourcesRequestSchema,
ReadResourceRequestSchema,
Tool,
Resource
} from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';
import { DatabaseManager } from './database.js';
import { logger } from './logger.js';
export class PostgreSQLMCPServer {
private server: Server;
private db: DatabaseManager;
constructor(db: DatabaseManager) {
this.db = db;
this.server = new Server(
{
name: 'mav-postgresql-mcp-server',
version: '1.0.0'
},
{
capabilities: {
tools: {},
resources: {}
}
}
);
this.setupHandlers();
}
private setupHandlers(): void {
// Tool handlers
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: this.getTools()
}));
this.server.setRequestHandler(CallToolRequestSchema, async (request) =>
this.handleToolCall(request.params.name, request.params.arguments || {})
);
// Resource handlers
this.server.setRequestHandler(ListResourcesRequestSchema, async () => ({
resources: await this.getResources()
}));
this.server.setRequestHandler(ReadResourceRequestSchema, async (request) =>
this.handleResourceRead(request.params.uri)
);
}
private getTools(): Tool[] {
const tools: Tool[] = [
// ==================== CORE TOOLS (Always Available) ====================
{
name: 'query',
description: 'Execute a read-only SQL query on the PostgreSQL database',
inputSchema: {
type: 'object',
properties: {
sql: {
type: 'string',
description: 'The SQL query to execute (use $1, $2, etc. for parameters)'
},
params: {
type: 'array',
description: 'Optional query parameters for prepared statements',
items: {
type: ['string', 'number', 'boolean', 'null']
}
}
},
required: ['sql']
}
},
{
name: 'list_tables',
description: 'List all tables in a schema',
inputSchema: {
type: 'object',
properties: {
schema: {
type: 'string',
description: 'Schema name (defaults to current schema)'
}
}
}
},
{
name: 'describe_table',
description: 'Get the schema/structure of a specific table',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to describe'
},
schema: {
type: 'string',
description: 'Schema name (defaults to current schema)'
}
},
required: ['table']
}
},
{
name: 'database_info',
description: 'Get general information about the database',
inputSchema: {
type: 'object',
properties: {}
}
},
{
name: 'show_indexes',
description: 'Show all indexes for a table',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to show indexes for'
},
schema: {
type: 'string',
description: 'Schema name (defaults to current schema)'
}
},
required: ['table']
}
},
{
name: 'explain_query',
description: 'Get the execution plan for a query',
inputSchema: {
type: 'object',
properties: {
sql: {
type: 'string',
description: 'The SQL query to explain'
},
params: {
type: 'array',
description: 'Optional query parameters',
items: {
type: ['string', 'number', 'boolean', 'null']
}
},
analyze: {
type: 'boolean',
description: 'Execute the query and show actual times (default: false)',
default: false
},
format: {
type: 'string',
enum: ['text', 'json', 'xml', 'yaml'],
description: 'Output format',
default: 'json'
}
},
required: ['sql']
}
},
{
name: 'show_constraints',
description: 'Show all constraints (foreign keys, unique, check) for a table',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to show constraints for'
},
schema: {
type: 'string',
description: 'Schema name (defaults to current schema)'
}
},
required: ['table']
}
},
// ==================== POSTGRESQL-SPECIFIC READ-ONLY TOOLS ====================
{
name: 'list_schemas',
description: 'List all schemas in the database',
inputSchema: {
type: 'object',
properties: {
include_system: {
type: 'boolean',
description: 'Include system schemas (pg_catalog, information_schema)',
default: false
}
}
}
},
{
name: 'get_current_schema',
description: 'Get the current schema and search path',
inputSchema: {
type: 'object',
properties: {}
}
},
{
name: 'list_extensions',
description: 'List installed PostgreSQL extensions',
inputSchema: {
type: 'object',
properties: {
include_available: {
type: 'boolean',
description: 'Also show available but not installed extensions',
default: false
}
}
}
},
{
name: 'extension_info',
description: 'Get detailed information about a specific extension',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Extension name'
}
},
required: ['name']
}
},
{
name: 'list_functions',
description: 'List all user-defined functions and procedures',
inputSchema: {
type: 'object',
properties: {
schema: {
type: 'string',
description: 'Filter by schema'
},
type: {
type: 'string',
enum: ['function', 'procedure', 'aggregate', 'window', 'all'],
description: 'Type of routine to list',
default: 'all'
}
}
}
},
{
name: 'list_triggers',
description: 'List all triggers in the database',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'Filter by table name'
},
schema: {
type: 'string',
description: 'Filter by schema name'
}
}
}
},
{
name: 'list_views',
description: 'List all views (regular and materialized)',
inputSchema: {
type: 'object',
properties: {
schema: {
type: 'string',
description: 'Filter by schema'
},
type: {
type: 'string',
enum: ['regular', 'materialized', 'all'],
description: 'Type of views to list',
default: 'all'
},
include_definition: {
type: 'boolean',
description: 'Include view definition SQL',
default: false
}
}
}
},
{
name: 'list_sequences',
description: 'List all sequences in the database',
inputSchema: {
type: 'object',
properties: {
schema: {
type: 'string',
description: 'Filter by schema'
}
}
}
},
{
name: 'table_stats',
description: 'Get statistics about table usage and performance',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'Table name (optional - shows all tables if not specified)'
},
schema: {
type: 'string',
description: 'Schema name'
}
}
}
},
{
name: 'connection_info',
description: 'Get information about the current database connection',
inputSchema: {
type: 'object',
properties: {}
}
},
{
name: 'database_size',
description: 'Get detailed size information for the database',
inputSchema: {
type: 'object',
properties: {
detail_level: {
type: 'string',
enum: ['summary', 'tables', 'full'],
description: 'Level of detail to return',
default: 'tables'
},
schema: {
type: 'string',
description: 'Filter by schema'
},
top_n: {
type: 'number',
description: 'Return only top N largest objects',
default: 20
}
}
}
},
{
name: 'jsonb_query',
description: 'Query JSONB fields using PostgreSQL JSONB operators',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'Table name containing JSONB column'
},
schema: {
type: 'string',
description: 'Schema name'
},
column: {
type: 'string',
description: 'JSONB column name'
},
operator: {
type: 'string',
enum: ['@>', '<@', '?', '?&', '?|'],
description: 'JSONB operator (@>: contains, <@: contained by, ?: key exists, ?&: all keys exist, ?|: any key exists)'
},
value: {
description: 'Value for the operator (JSON object for containment, string for key existence, array for multiple keys)'
},
select_columns: {
type: 'array',
items: { type: 'string' },
description: 'Columns to return (default: all)'
},
limit: {
type: 'number',
description: 'Maximum rows to return',
default: 100
}
},
required: ['table', 'column', 'operator', 'value']
}
},
{
name: 'jsonb_path_query',
description: 'Query JSONB data using SQL/JSON path expressions',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'Table name'
},
schema: {
type: 'string',
description: 'Schema name'
},
column: {
type: 'string',
description: 'JSONB column name'
},
path_expression: {
type: 'string',
description: 'JSON path expression (e.g., "$.store.book[*].author")'
},
filter_exists: {
type: 'boolean',
description: 'Only return rows where path matches (use @? operator)',
default: false
},
limit: {
type: 'number',
description: 'Maximum rows to return',
default: 100
}
},
required: ['table', 'column', 'path_expression']
}
}
];
// Add write operation tools if enabled
if (process.env.ALLOW_WRITE_OPERATIONS === 'true') {
tools.push(
// ==================== WRITE TOOLS ====================
{
name: 'insert',
description: 'Insert data into a table',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to insert into'
},
schema: {
type: 'string',
description: 'Schema name'
},
data: {
type: 'object',
description: 'Key-value pairs of column names and values to insert',
additionalProperties: true
},
returning: {
type: 'array',
items: { type: 'string' },
description: 'Columns to return (default: all)'
}
},
required: ['table', 'data']
}
},
{
name: 'update',
description: 'Update data in a table',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to update'
},
schema: {
type: 'string',
description: 'Schema name'
},
data: {
type: 'object',
description: 'Key-value pairs of column names and values to update',
additionalProperties: true
},
where: {
type: 'object',
description: 'Key-value pairs for WHERE clause conditions',
additionalProperties: true
},
returning: {
type: 'array',
items: { type: 'string' },
description: 'Columns to return'
}
},
required: ['table', 'data', 'where']
}
},
{
name: 'delete',
description: 'Delete data from a table',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to delete from'
},
schema: {
type: 'string',
description: 'Schema name'
},
where: {
type: 'object',
description: 'Key-value pairs for WHERE clause conditions',
additionalProperties: true
},
returning: {
type: 'array',
items: { type: 'string' },
description: 'Columns to return'
}
},
required: ['table', 'where']
}
},
{
name: 'create_table',
description: 'Create a new table',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to create'
},
schema: {
type: 'string',
description: 'Schema name'
},
columns: {
type: 'array',
description: 'Array of column definitions',
items: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Column name'
},
type: {
type: 'string',
description: 'Column data type (e.g., VARCHAR(255), INTEGER, TEXT, SERIAL)'
},
nullable: {
type: 'boolean',
description: 'Whether the column can be NULL',
default: true
},
primary: {
type: 'boolean',
description: 'Whether this column is a primary key',
default: false
},
default: {
type: ['string', 'number', 'boolean', 'null'],
description: 'Default value for the column'
}
},
required: ['name', 'type']
}
},
if_not_exists: {
type: 'boolean',
description: 'Do not error if table exists',
default: false
}
},
required: ['table', 'columns']
}
},
{
name: 'alter_table',
description: 'Alter an existing table structure',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to alter'
},
schema: {
type: 'string',
description: 'Schema name'
},
operation: {
type: 'string',
enum: ['add_column', 'drop_column', 'alter_column_type', 'set_not_null', 'drop_not_null', 'set_default', 'drop_default', 'rename_column'],
description: 'The type of alteration to perform'
},
column: {
type: 'object',
description: 'Column definition for operations',
properties: {
name: {
type: 'string',
description: 'Column name'
},
type: {
type: 'string',
description: 'Column data type'
},
nullable: {
type: 'boolean',
description: 'Whether the column can be NULL'
},
default: {
type: ['string', 'number', 'boolean', 'null'],
description: 'Default value for the column'
},
newName: {
type: 'string',
description: 'New name for rename operations'
}
}
}
},
required: ['table', 'operation']
}
},
{
name: 'drop_table',
description: 'Drop a table from the database (requires confirmation)',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to drop'
},
schema: {
type: 'string',
description: 'Schema name'
},
cascade: {
type: 'boolean',
description: 'Drop dependent objects (CASCADE)',
default: false
},
confirm: {
type: 'boolean',
description: 'Must be true to confirm the drop operation'
}
},
required: ['table', 'confirm']
}
},
{
name: 'bulk_insert',
description: 'Insert multiple rows into a table efficiently',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table to insert into'
},
schema: {
type: 'string',
description: 'Schema name'
},
columns: {
type: 'array',
description: 'Column names for the insert',
items: {
type: 'string'
}
},
rows: {
type: 'array',
description: 'Array of value arrays to insert',
items: {
type: 'array',
items: {
type: ['string', 'number', 'boolean', 'null']
}
}
}
},
required: ['table', 'columns', 'rows']
}
},
{
name: 'execute_procedure',
description: 'Execute a stored procedure or function',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'The name of the procedure/function to execute'
},
schema: {
type: 'string',
description: 'Schema name'
},
params: {
type: 'array',
description: 'Parameters to pass',
items: {
type: ['string', 'number', 'boolean', 'null']
}
},
is_procedure: {
type: 'boolean',
description: 'True for CALL (procedures), false for SELECT (functions)',
default: false
}
},
required: ['name']
}
},
{
name: 'add_index',
description: 'Add an index to a table',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'The name of the table'
},
schema: {
type: 'string',
description: 'Schema name'
},
name: {
type: 'string',
description: 'The name of the index'
},
columns: {
type: 'array',
description: 'Column names to include in the index',
items: {
type: 'string'
}
},
unique: {
type: 'boolean',
description: 'Whether this should be a unique index',
default: false
},
method: {
type: 'string',
enum: ['btree', 'hash', 'gist', 'gin', 'brin'],
description: 'Index method',
default: 'btree'
},
concurrent: {
type: 'boolean',
description: 'Create index without locking writes',
default: false
}
},
required: ['table', 'name', 'columns']
}
},
{
name: 'drop_index',
description: 'Drop an index from the database',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'The name of the index to drop'
},
schema: {
type: 'string',
description: 'Schema name'
},
concurrent: {
type: 'boolean',
description: 'Drop index without locking',
default: false
},
if_exists: {
type: 'boolean',
description: 'Do not error if index does not exist',
default: false
}
},
required: ['name']
}
},
{
name: 'rename_table',
description: 'Rename a table',
inputSchema: {
type: 'object',
properties: {
oldName: {
type: 'string',
description: 'Current name of the table'
},
newName: {
type: 'string',
description: 'New name for the table'
},
schema: {
type: 'string',
description: 'Schema name'
}
},
required: ['oldName', 'newName']
}
},
{
name: 'set_search_path',
description: 'Set the schema search path for the session',
inputSchema: {
type: 'object',
properties: {
schemas: {
type: 'array',
description: 'Array of schema names in order of priority',
items: { type: 'string' },
minItems: 1
}
},
required: ['schemas']
}
},
{
name: 'create_schema',
description: 'Create a new schema in the database',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Name of the schema to create'
},
if_not_exists: {
type: 'boolean',
description: 'Do not error if schema exists',
default: false
}
},
required: ['name']
}
},
{
name: 'drop_schema',
description: 'Drop a schema from the database',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Name of the schema to drop'
},
cascade: {
type: 'boolean',
description: 'Drop all objects in the schema (CASCADE)',
default: false
},
confirm: {
type: 'boolean',
description: 'Must be true to confirm the drop operation'
}
},
required: ['name', 'confirm']
}
},
{
name: 'jsonb_update',
description: 'Update JSONB fields using jsonb_set or other modification functions',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'Table name'
},
schema: {
type: 'string',
description: 'Schema name'
},
column: {
type: 'string',
description: 'JSONB column name'
},
operation: {
type: 'string',
enum: ['set', 'delete_key', 'concat', 'delete_path'],
description: 'Type of JSONB modification'
},
path: {
type: 'array',
items: { type: 'string' },
description: 'JSON path as array (e.g., ["address", "city"])'
},
value: {
description: 'New value to set (for set/concat operations)'
},
where: {
type: 'object',
description: 'WHERE conditions as key-value pairs'
}
},
required: ['table', 'column', 'operation', 'where']
}
},
{
name: 'vacuum_analyze',
description: 'Run VACUUM and/or ANALYZE on tables for maintenance',
inputSchema: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'Table name (optional - runs on database if not specified)'
},
schema: {
type: 'string',
description: 'Schema name'
},
operation: {
type: 'string',
enum: ['vacuum', 'analyze', 'vacuum_analyze'],
description: 'Maintenance operation to perform',
default: 'vacuum_analyze'
},
verbose: {
type: 'boolean',
description: 'Show detailed progress',
default: false
}
}
}
}
);
}
return tools;
}
private async getResources(): Promise<Resource[]> {
try {
const tables = await this.db.getTableList();
const schema = this.db.getSchema();
const resources: Resource[] = [
{
uri: 'pg://database/schema',
name: 'Database Schema',
description: 'Complete database schema information',
mimeType: 'application/json'
},
{
uri: 'pg://database/info',
name: 'Database Info',
description: 'Database metadata and statistics',
mimeType: 'application/json'
}
];
// Add resource for each table
for (const table of tables) {
resources.push({
uri: `pg://table/${schema}.${table}`,
name: `Table: ${schema}.${table}`,
description: `Schema information for table ${table}`,
mimeType: 'application/json'
});
}
return resources;
} catch (error) {
logger.error('Failed to get resources', error);
return [];
}
}
private async handleToolCall(name: string, args: any): Promise<any> {
try {
logger.info('Tool called', { tool: name, args });
switch (name) {
// ==================== CORE TOOLS ====================
case 'query': {
const querySchema = z.object({
sql: z.string(),
params: z.array(z.union([z.string(), z.number(), z.boolean(), z.null()])).optional()
});
const { sql, params } = querySchema.parse(args);
const result = await this.db.executeQuery(sql, params);
return {
content: [{
type: 'text',
text: JSON.stringify({
rows: result.rows,
rowCount: result.rowCount,
fields: result.fields ? result.fields.map(f => ({ name: f.name, dataTypeID: f.dataTypeID })) : [],
command: result.command
}, null, 2)
}]
};
}
case 'list_tables': {
const schema = args.schema || this.db.getSchema();
const tables = await this.db.getTableList(schema);
return {
content: [{
type: 'text',
text: `Found ${tables.length} tables in schema "${schema}":\n${tables.join('\n')}`
}]
};
}
case 'describe_table': {
const tableSchema = z.object({
table: z.string(),
schema: z.string().optional()
});
const { table, schema } = tableSchema.parse(args);
const tableInfo = await this.db.getTableSchema(table, schema);
return {
content: [{
type: 'text',
text: `Schema for table ${schema || this.db.getSchema()}.${table}:\n${JSON.stringify(tableInfo, null, 2)}`
}]
};
}
case 'database_info': {
const info = await this.db.getDatabaseInfo();
return {
content: [{
type: 'text',
text: JSON.stringify(info, null, 2)
}]
};
}
case 'show_indexes': {
const { table, schema } = args;
const targetSchema = schema || this.db.getSchema();
this.db.validateIdentifier(table, 'table');
const result = await this.db.executeQuery(`
SELECT
i.relname AS index_name,
am.amname AS index_type,
ix.indisunique AS is_unique,
ix.indisprimary AS is_primary,
pg_get_indexdef(ix.indexrelid) AS index_definition,
array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) AS columns
FROM pg_catalog.pg_index ix
JOIN pg_catalog.pg_class t ON t.oid = ix.indrelid
JOIN pg_catalog.pg_class i ON i.oid = ix.indexrelid
JOIN pg_catalog.pg_am am ON am.oid = i.relam
JOIN pg_catalog.pg_namespace n ON n.oid = t.relnamespace
JOIN pg_catalog.pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
WHERE t.relname = $1
AND n.nspname = $2
GROUP BY i.relname, am.amname, ix.indisunique, ix.indisprimary, ix.indexrelid
ORDER BY i.relname
`, [table, targetSchema]);
return {
content: [{
type: 'text',
text: `Indexes for table ${targetSchema}.${table}:\n${JSON.stringify(result.rows, null, 2)}`
}]
};
}
case 'explain_query': {
const { sql, params, analyze, format } = args;
const fmt = format || 'json';
const analyzeOpt = analyze ? 'ANALYZE,' : '';
const explainSql = `EXPLAIN (${analyzeOpt} FORMAT ${fmt.toUpperCase()}) ${sql}`;
const result = await this.db.executeQuery(explainSql, params);
return {
content: [{
type: 'text',
text: `Query execution plan:\n${JSON.stringify(result.rows, null, 2)}`
}]
};
}
case 'show_constraints': {
const { table, schema } = args;
const targetSchema = schema || this.db.getSchema();
this.db.validateIdentifier(table, 'table');
const result = await this.db.executeQuery(`
SELECT
tc.constraint_name,
tc.constraint_type,
kcu.column_name,
ccu.table_name AS referenced_table,
ccu.column_name AS referenced_column,
pg_get_constraintdef(pgc.oid) AS constraint_definition
FROM information_schema.table_constraints tc
JOIN pg_constraint pgc ON tc.constraint_name = pgc.conname
LEFT JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
LEFT JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
AND tc.table_schema = ccu.table_schema
WHERE tc.table_schema = $1
AND tc.table_name = $2
ORDER BY tc.constraint_type, tc.constraint_name
`, [targetSchema, table]);
return {
content: [{
type: 'text',
text: JSON.stringify({
table: `${targetSchema}.${table}`,
constraints: result.rows
}, null, 2)
}]
};
}
// ==================== POSTGRESQL-SPECIFIC READ TOOLS ====================
case 'list_schemas': {
const includeSystem = args.include_system || false;
const result = await this.db.executeQuery(`
SELECT
n.nspname AS schema_name,
r.rolname AS owner,
pg_catalog.obj_description(n.oid, 'pg_namespace') AS description
FROM pg_catalog.pg_namespace n
LEFT JOIN pg_catalog.pg_roles r ON n.nspowner = r.oid
WHERE (n.nspname !~ '^pg_' AND n.nspname <> 'information_schema')
OR $1 = true
ORDER BY schema_name
`, [includeSystem]);
return {
content: [{
type: 'text',
text: JSON.stringify({
schemas: result.rows,
count: result.rows.length
}, null, 2)
}]
};
}
case 'get_current_schema': {
const result = await this.db.executeQuery(`
SELECT
current_schema() AS current_schema,
current_schemas(false) AS search_path_schemas,
current_setting('search_path') AS search_path
`);
return {
content: [{
type: 'text',
text: JSON.stringify(result.rows[0], null, 2)
}]
};
}
case 'list_extensions': {
const includeAvailable = args.include_available || false;
let sql = `
SELECT
e.extname AS name,
e.extversion AS version,
n.nspname AS schema,
c.description
FROM pg_extension e
JOIN pg_namespace n ON e.extnamespace = n.oid
LEFT JOIN pg_description c ON c.objoid = e.oid
ORDER BY e.extname
`;
if (includeAvailable) {
sql = `
SELECT
name,
default_version,
installed_version,
comment AS description
FROM pg_available_extensions
ORDER BY name
`;
}
const result = await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: JSON.stringify({
extensions: result.rows,
count: result.rows.length
}, null, 2)
}]
};
}
case 'extension_info': {
const { name } = args;
const result = await this.db.executeQuery(`
SELECT
e.extname AS name,
e.extversion AS version,
n.nspname AS schema,
e.extrelocatable AS relocatable,
pg_catalog.obj_description(e.oid, 'pg_extension') AS description
FROM pg_extension e
JOIN pg_namespace n ON e.extnamespace = n.oid
WHERE e.extname = $1
`, [name]);
if (result.rows.length === 0) {
return {
content: [{
type: 'text',
text: `Extension "${name}" not found`
}],
isError: true
};
}
return {
content: [{
type: 'text',
text: JSON.stringify(result.rows[0], null, 2)
}]
};
}
case 'list_functions': {
const { schema, type } = args;
const targetSchema = schema || null;
const funcType = type || 'all';
let typeFilter = '';
if (funcType !== 'all') {
const typeMap: Record<string, string> = {
'function': 'f',
'procedure': 'p',
'aggregate': 'a',
'window': 'w'
};
typeFilter = `AND p.prokind = '${typeMap[funcType] || 'f'}'`;
}
const result = await this.db.executeQuery(`
SELECT
n.nspname AS schema,
p.proname AS name,
pg_catalog.pg_get_function_result(p.oid) AS return_type,
pg_catalog.pg_get_function_arguments(p.oid) AS arguments,
CASE p.prokind
WHEN 'f' THEN 'function'
WHEN 'p' THEN 'procedure'
WHEN 'a' THEN 'aggregate'
WHEN 'w' THEN 'window'
END AS type,
l.lanname AS language
FROM pg_catalog.pg_proc p
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
LEFT JOIN pg_catalog.pg_language l ON l.oid = p.prolang
WHERE n.nspname NOT IN ('pg_catalog', 'information_schema')
${targetSchema ? 'AND n.nspname = $1' : ''}
${typeFilter}
ORDER BY n.nspname, p.proname
`, targetSchema ? [targetSchema] : []);
return {
content: [{
type: 'text',
text: JSON.stringify({
functions: result.rows,
count: result.rows.length
}, null, 2)
}]
};
}
case 'list_triggers': {
const { table, schema } = args;
let whereClause = `WHERE NOT t.tgisinternal`;
const params: string[] = [];
if (table) {
params.push(table);
whereClause += ` AND c.relname = $${params.length}`;
}
if (schema) {
params.push(schema);
whereClause += ` AND n.nspname = $${params.length}`;
}
const result = await this.db.executeQuery(`
SELECT
n.nspname AS schema,
c.relname AS table_name,
t.tgname AS trigger_name,
CASE
WHEN t.tgtype & 2 = 2 THEN 'BEFORE'
WHEN t.tgtype & 64 = 64 THEN 'INSTEAD OF'
ELSE 'AFTER'
END AS timing,
p.proname AS function_name,
pg_catalog.pg_get_triggerdef(t.oid) AS definition
FROM pg_catalog.pg_trigger t
JOIN pg_catalog.pg_class c ON t.tgrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_catalog.pg_proc p ON t.tgfoid = p.oid
${whereClause}
ORDER BY n.nspname, c.relname, t.tgname
`, params);
return {
content: [{
type: 'text',
text: JSON.stringify({
triggers: result.rows,
count: result.rows.length
}, null, 2)
}]
};
}
case 'list_views': {
const { schema, type, include_definition } = args;
const viewType = type || 'all';
const includeDef = include_definition || false;
let queries: string[] = [];
if (viewType === 'all' || viewType === 'regular') {
queries.push(`
SELECT
schemaname AS schema,
viewname AS name,
viewowner AS owner,
'regular' AS type
${includeDef ? ', definition' : ''}
FROM pg_views
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
${schema ? `AND schemaname = '${this.db.escapeStringValue(schema)}'` : ''}
`);
}
if (viewType === 'all' || viewType === 'materialized') {
queries.push(`
SELECT
schemaname AS schema,
matviewname AS name,
matviewowner AS owner,
'materialized' AS type
${includeDef ? ', definition' : ''}
FROM pg_matviews
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
${schema ? `AND schemaname = '${this.db.escapeStringValue(schema)}'` : ''}
`);
}
const sql = queries.join(' UNION ALL ') + ' ORDER BY schema, name';
const result = await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: JSON.stringify({
views: result.rows,
count: result.rows.length
}, null, 2)
}]
};
}
case 'list_sequences': {
const { schema } = args;
const result = await this.db.executeQuery(`
SELECT
schemaname AS schema,
sequencename AS name,
sequenceowner AS owner,
data_type,
start_value,
min_value,
max_value,
increment_by,
cycle AS is_cyclic,
cache_size,
last_value
FROM pg_sequences
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
${schema ? 'AND schemaname = $1' : ''}
ORDER BY schemaname, sequencename
`, schema ? [schema] : []);
return {
content: [{
type: 'text',
text: JSON.stringify({
sequences: result.rows,
count: result.rows.length
}, null, 2)
}]
};
}
case 'table_stats': {
const { table, schema } = args;
const targetSchema = schema || this.db.getSchema();
let whereClause = `WHERE schemaname = $1`;
const params: any[] = [targetSchema];
if (table) {
this.db.validateIdentifier(table, 'table');
params.push(table);
whereClause += ` AND relname = $${params.length}`;
}
const result = await this.db.executeQuery(`
SELECT
schemaname AS schema,
relname AS table_name,
n_live_tup AS live_rows,
n_dead_tup AS dead_rows,
n_mod_since_analyze AS modifications_since_analyze,
seq_scan AS sequential_scans,
seq_tup_read AS seq_tuples_read,
idx_scan AS index_scans,
idx_tup_fetch AS index_tuples_fetched,
n_tup_ins AS inserts,
n_tup_upd AS updates,
n_tup_del AS deletes,
n_tup_hot_upd AS hot_updates,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze
FROM pg_stat_user_tables
${whereClause}
ORDER BY schemaname, relname
`, params);
return {
content: [{
type: 'text',
text: JSON.stringify({
statistics: result.rows,
count: result.rows.length
}, null, 2)
}]
};
}
case 'connection_info': {
const result = await this.db.executeQuery(`
SELECT
current_database() AS database,
current_user AS user,
session_user AS session_user,
inet_client_addr() AS client_address,
inet_client_port() AS client_port,
pg_backend_pid() AS backend_pid,
current_setting('server_version') AS server_version,
current_setting('TimeZone') AS timezone,
current_setting('client_encoding') AS client_encoding,
current_setting('search_path') AS search_path,
now() AS current_time
`);
return {
content: [{
type: 'text',
text: JSON.stringify(result.rows[0], null, 2)
}]
};
}
case 'database_size': {
const { detail_level, schema, top_n } = args;
const level = detail_level || 'tables';
const limit = top_n || 20;
if (level === 'summary') {
const result = await this.db.executeQuery(`
SELECT
current_database() AS database,
pg_size_pretty(pg_database_size(current_database())) AS total_size,
pg_database_size(current_database()) AS total_bytes
`);
return {
content: [{
type: 'text',
text: JSON.stringify(result.rows[0], null, 2)
}]
};
}
const result = await this.db.executeQuery(`
SELECT
n.nspname AS schema,
c.relname AS table_name,
pg_size_pretty(pg_total_relation_size(c.oid)) AS total_size,
pg_size_pretty(pg_table_size(c.oid)) AS table_size,
pg_size_pretty(pg_indexes_size(c.oid)) AS indexes_size,
pg_total_relation_size(c.oid) AS total_bytes,
c.reltuples::bigint AS estimated_rows
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
${schema ? `AND n.nspname = '${this.db.escapeStringValue(schema)}'` : ''}
ORDER BY pg_total_relation_size(c.oid) DESC
LIMIT $1
`, [limit]);
return {
content: [{
type: 'text',
text: JSON.stringify({
tables: result.rows,
count: result.rows.length
}, null, 2)
}]
};
}
case 'jsonb_query': {
const { table, schema, column, operator, value, select_columns, limit } = args;
const targetSchema = schema || this.db.getSchema();
const cols = select_columns?.join(', ') || '*';
const maxRows = limit || 100;
this.db.validateIdentifier(table, 'table');
this.db.validateIdentifier(column, 'column');
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const colRef = this.db.safeIdentifier(column, 'column');
let sql: string;
let params: any[];
switch (operator) {
case '@>':
case '<@':
sql = `SELECT ${cols} FROM ${tableRef} WHERE ${colRef} ${operator} $1::jsonb LIMIT $2`;
params = [JSON.stringify(value), maxRows];
break;
case '?':
sql = `SELECT ${cols} FROM ${tableRef} WHERE ${colRef} ? $1 LIMIT $2`;
params = [value, maxRows];
break;
case '?&':
case '?|':
sql = `SELECT ${cols} FROM ${tableRef} WHERE ${colRef} ${operator} $1::text[] LIMIT $2`;
params = [value, maxRows];
break;
default:
throw new Error(`Invalid JSONB operator: ${operator}`);
}
const result = await this.db.executeQuery(sql, params);
return {
content: [{
type: 'text',
text: JSON.stringify({
rows: result.rows,
rowCount: result.rowCount
}, null, 2)
}]
};
}
case 'jsonb_path_query': {
const { table, schema, column, path_expression, filter_exists, limit } = args;
const targetSchema = schema || this.db.getSchema();
const maxRows = limit || 100;
this.db.validateIdentifier(table, 'table');
this.db.validateIdentifier(column, 'column');
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const colRef = this.db.safeIdentifier(column, 'column');
let sql: string;
if (filter_exists) {
sql = `SELECT * FROM ${tableRef} WHERE ${colRef} @? $1::jsonpath LIMIT $2`;
} else {
sql = `SELECT *, jsonb_path_query(${colRef}, $1::jsonpath) AS path_result FROM ${tableRef} LIMIT $2`;
}
const result = await this.db.executeQuery(sql, [path_expression, maxRows]);
return {
content: [{
type: 'text',
text: JSON.stringify({
rows: result.rows,
rowCount: result.rowCount
}, null, 2)
}]
};
}
// ==================== WRITE TOOLS ====================
case 'insert': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, data, returning } = args;
const targetSchema = schema || this.db.getSchema();
const columns = Object.keys(data);
const values = Object.values(data);
const placeholders = columns.map((_, i) => `$${i + 1}`).join(', ');
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const safeColumns = columns.map(c => this.db.safeIdentifier(c, 'column')).join(', ');
const returningClause = returning ? `RETURNING ${returning.map((c: string) => this.db.safeIdentifier(c, 'column')).join(', ')}` : 'RETURNING *';
const sql = `INSERT INTO ${tableRef} (${safeColumns}) VALUES (${placeholders}) ${returningClause}`;
const result = await this.db.executeQuery(sql, values);
return {
content: [{
type: 'text',
text: `Inserted ${result.rowCount} row(s) into ${targetSchema}.${table}\n${JSON.stringify(result.rows, null, 2)}`
}]
};
}
case 'update': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, data, where, returning } = args;
const targetSchema = schema || this.db.getSchema();
const setColumns = Object.keys(data);
const setValues = Object.values(data);
const whereColumns = Object.keys(where);
const whereValues = Object.values(where);
let paramIndex = 1;
const setSql = setColumns.map(c => `${this.db.safeIdentifier(c, 'column')} = $${paramIndex++}`).join(', ');
const whereSql = whereColumns.map(c => `${this.db.safeIdentifier(c, 'column')} = $${paramIndex++}`).join(' AND ');
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const returningClause = returning ? `RETURNING ${returning.map((c: string) => this.db.safeIdentifier(c, 'column')).join(', ')}` : 'RETURNING *';
const sql = `UPDATE ${tableRef} SET ${setSql} WHERE ${whereSql} ${returningClause}`;
const result = await this.db.executeQuery(sql, [...setValues, ...whereValues]);
return {
content: [{
type: 'text',
text: `Updated ${result.rowCount} row(s) in ${targetSchema}.${table}\n${JSON.stringify(result.rows, null, 2)}`
}]
};
}
case 'delete': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, where, returning } = args;
const targetSchema = schema || this.db.getSchema();
const whereColumns = Object.keys(where);
const whereValues = Object.values(where);
const whereSql = whereColumns.map((c, i) => `${this.db.safeIdentifier(c, 'column')} = $${i + 1}`).join(' AND ');
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const returningClause = returning ? `RETURNING ${returning.map((c: string) => this.db.safeIdentifier(c, 'column')).join(', ')}` : 'RETURNING *';
const sql = `DELETE FROM ${tableRef} WHERE ${whereSql} ${returningClause}`;
const result = await this.db.executeQuery(sql, whereValues);
return {
content: [{
type: 'text',
text: `Deleted ${result.rowCount} row(s) from ${targetSchema}.${table}\n${JSON.stringify(result.rows, null, 2)}`
}]
};
}
case 'create_table': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, columns, if_not_exists } = args;
const targetSchema = schema || this.db.getSchema();
const columnDefs = columns.map((col: any) => {
let def = `${this.db.safeIdentifier(col.name, 'column')} ${col.type}`;
if (col.nullable === false) def += ' NOT NULL';
if (col.default !== undefined) {
if (col.default === null) {
def += ' DEFAULT NULL';
} else if (typeof col.default === 'string') {
def += ` DEFAULT '${this.db.escapeStringValue(col.default)}'`;
} else if (typeof col.default === 'boolean') {
def += ` DEFAULT ${col.default ? 'TRUE' : 'FALSE'}`;
} else {
def += ` DEFAULT ${col.default}`;
}
}
return def;
});
const primaryKeys = columns.filter((col: any) => col.primary).map((col: any) => this.db.safeIdentifier(col.name, 'column'));
if (primaryKeys.length > 0) {
columnDefs.push(`PRIMARY KEY (${primaryKeys.join(', ')})`);
}
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const ifNotExistsClause = if_not_exists ? 'IF NOT EXISTS ' : '';
const sql = `CREATE TABLE ${ifNotExistsClause}${tableRef} (${columnDefs.join(', ')})`;
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `Created table ${targetSchema}.${table} successfully`
}]
};
}
case 'alter_table': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, operation, column } = args;
const targetSchema = schema || this.db.getSchema();
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
let sql = '';
switch (operation) {
case 'add_column':
if (!column || !column.type) throw new Error('Column definition required for add_column');
let colDef = `${this.db.safeIdentifier(column.name, 'column')} ${column.type}`;
if (column.nullable === false) colDef += ' NOT NULL';
if (column.default !== undefined) {
if (column.default === null) {
colDef += ' DEFAULT NULL';
} else if (typeof column.default === 'string') {
colDef += ` DEFAULT '${this.db.escapeStringValue(column.default)}'`;
} else {
colDef += ` DEFAULT ${column.default}`;
}
}
sql = `ALTER TABLE ${tableRef} ADD COLUMN ${colDef}`;
break;
case 'drop_column':
if (!column || !column.name) throw new Error('Column name required for drop_column');
sql = `ALTER TABLE ${tableRef} DROP COLUMN ${this.db.safeIdentifier(column.name, 'column')}`;
break;
case 'alter_column_type':
if (!column || !column.name || !column.type) throw new Error('Column name and type required');
sql = `ALTER TABLE ${tableRef} ALTER COLUMN ${this.db.safeIdentifier(column.name, 'column')} TYPE ${column.type}`;
break;
case 'set_not_null':
if (!column || !column.name) throw new Error('Column name required');
sql = `ALTER TABLE ${tableRef} ALTER COLUMN ${this.db.safeIdentifier(column.name, 'column')} SET NOT NULL`;
break;
case 'drop_not_null':
if (!column || !column.name) throw new Error('Column name required');
sql = `ALTER TABLE ${tableRef} ALTER COLUMN ${this.db.safeIdentifier(column.name, 'column')} DROP NOT NULL`;
break;
case 'set_default':
if (!column || !column.name || column.default === undefined) throw new Error('Column name and default required');
let defaultVal = column.default;
if (typeof defaultVal === 'string') defaultVal = `'${this.db.escapeStringValue(defaultVal)}'`;
else if (defaultVal === null) defaultVal = 'NULL';
sql = `ALTER TABLE ${tableRef} ALTER COLUMN ${this.db.safeIdentifier(column.name, 'column')} SET DEFAULT ${defaultVal}`;
break;
case 'drop_default':
if (!column || !column.name) throw new Error('Column name required');
sql = `ALTER TABLE ${tableRef} ALTER COLUMN ${this.db.safeIdentifier(column.name, 'column')} DROP DEFAULT`;
break;
case 'rename_column':
if (!column || !column.name || !column.newName) throw new Error('Column names required for rename_column');
sql = `ALTER TABLE ${tableRef} RENAME COLUMN ${this.db.safeIdentifier(column.name, 'column')} TO ${this.db.safeIdentifier(column.newName, 'column')}`;
break;
default:
throw new Error(`Unknown operation: ${operation}`);
}
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `Altered table ${targetSchema}.${table}: ${operation} completed successfully`
}]
};
}
case 'drop_table': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, cascade, confirm } = args;
if (!confirm) {
throw new Error('Drop table operation requires confirm: true to proceed');
}
const targetSchema = schema || this.db.getSchema();
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const cascadeClause = cascade ? ' CASCADE' : '';
const sql = `DROP TABLE ${tableRef}${cascadeClause}`;
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `Dropped table ${targetSchema}.${table} successfully`
}]
};
}
case 'bulk_insert': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, columns, rows } = args;
const targetSchema = schema || this.db.getSchema();
if (rows.length === 0) {
throw new Error('No rows provided for bulk insert');
}
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const columnList = columns.map((c: string) => this.db.safeIdentifier(c, 'column')).join(', ');
let paramIndex = 1;
const valuePlaceholders = rows.map((row: any[]) =>
`(${row.map(() => `$${paramIndex++}`).join(', ')})`
).join(', ');
const sql = `INSERT INTO ${tableRef} (${columnList}) VALUES ${valuePlaceholders} RETURNING *`;
const flatValues = rows.flat();
const result = await this.db.executeQuery(sql, flatValues);
return {
content: [{
type: 'text',
text: `Bulk inserted ${result.rowCount} row(s) into ${targetSchema}.${table}`
}]
};
}
case 'execute_procedure': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { name, schema, params, is_procedure } = args;
const targetSchema = schema || this.db.getSchema();
const procRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(name, 'table')}`;
const placeholders = params ? params.map((_: any, i: number) => `$${i + 1}`).join(', ') : '';
let sql: string;
if (is_procedure) {
sql = `CALL ${procRef}(${placeholders})`;
} else {
sql = `SELECT * FROM ${procRef}(${placeholders})`;
}
const result = await this.db.executeQuery(sql, params || []);
return {
content: [{
type: 'text',
text: JSON.stringify({
name,
result: result.rows,
rowCount: result.rowCount
}, null, 2)
}]
};
}
case 'add_index': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, name, columns, unique, method, concurrent } = args;
const targetSchema = schema || this.db.getSchema();
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const indexName = this.db.safeIdentifier(name, 'column');
const columnList = columns.map((c: string) => this.db.safeIdentifier(c, 'column')).join(', ');
const uniqueClause = unique ? 'UNIQUE ' : '';
const concurrentClause = concurrent ? 'CONCURRENTLY ' : '';
const usingClause = method && method !== 'btree' ? ` USING ${method.toUpperCase()}` : '';
const sql = `CREATE ${uniqueClause}INDEX ${concurrentClause}${indexName} ON ${tableRef}${usingClause} (${columnList})`;
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `Created ${unique ? 'unique ' : ''}index ${name} on ${targetSchema}.${table}`
}]
};
}
case 'drop_index': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { name, schema, concurrent, if_exists } = args;
const targetSchema = schema || this.db.getSchema();
const indexRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(name, 'column')}`;
const concurrentClause = concurrent ? 'CONCURRENTLY ' : '';
const ifExistsClause = if_exists ? 'IF EXISTS ' : '';
const sql = `DROP INDEX ${concurrentClause}${ifExistsClause}${indexRef}`;
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `Dropped index ${name}`
}]
};
}
case 'rename_table': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { oldName, newName, schema } = args;
const targetSchema = schema || this.db.getSchema();
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(oldName, 'table')}`;
const newTableName = this.db.safeIdentifier(newName, 'table');
const sql = `ALTER TABLE ${tableRef} RENAME TO ${newTableName}`;
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `Renamed table from ${oldName} to ${newName}`
}]
};
}
case 'set_search_path': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { schemas } = args;
const schemaList = schemas.map((s: string) => this.db.safeIdentifier(s, 'schema')).join(', ');
const sql = `SET search_path TO ${schemaList}`;
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `Set search_path to: ${schemas.join(', ')}`
}]
};
}
case 'create_schema': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { name, if_not_exists } = args;
const ifNotExistsClause = if_not_exists ? 'IF NOT EXISTS ' : '';
const sql = `CREATE SCHEMA ${ifNotExistsClause}${this.db.safeIdentifier(name, 'schema')}`;
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `Created schema ${name} successfully`
}]
};
}
case 'drop_schema': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { name, cascade, confirm } = args;
if (!confirm) {
throw new Error('Drop schema operation requires confirm: true to proceed');
}
const cascadeClause = cascade ? ' CASCADE' : ' RESTRICT';
const sql = `DROP SCHEMA ${this.db.safeIdentifier(name, 'schema')}${cascadeClause}`;
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `Dropped schema ${name} successfully`
}]
};
}
case 'jsonb_update': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, column, operation, path, value, where } = args;
const targetSchema = schema || this.db.getSchema();
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
const colRef = this.db.safeIdentifier(column, 'column');
const whereColumns = Object.keys(where);
const whereValues = Object.values(where);
let setExpression: string;
let params: any[] = [];
let paramIndex = 1;
switch (operation) {
case 'set':
const pathArray = `'{${path.join(',')}}'`;
setExpression = `${colRef} = jsonb_set(${colRef}, ${pathArray}, $${paramIndex++}::jsonb, true)`;
params.push(JSON.stringify(value));
break;
case 'delete_key':
setExpression = `${colRef} = ${colRef} - $${paramIndex++}`;
params.push(path[0]);
break;
case 'delete_path':
setExpression = `${colRef} = ${colRef} #- '{${path.join(',')}}'`;
break;
case 'concat':
setExpression = `${colRef} = ${colRef} || $${paramIndex++}::jsonb`;
params.push(JSON.stringify(value));
break;
default:
throw new Error(`Unknown JSONB operation: ${operation}`);
}
const whereSql = whereColumns.map(c => `${this.db.safeIdentifier(c, 'column')} = $${paramIndex++}`).join(' AND ');
params.push(...whereValues);
const sql = `UPDATE ${tableRef} SET ${setExpression} WHERE ${whereSql} RETURNING *`;
const result = await this.db.executeQuery(sql, params);
return {
content: [{
type: 'text',
text: `Updated ${result.rowCount} row(s)\n${JSON.stringify(result.rows, null, 2)}`
}]
};
}
case 'vacuum_analyze': {
if (process.env.ALLOW_WRITE_OPERATIONS !== 'true') {
throw new Error('Write operations are not enabled. Set ALLOW_WRITE_OPERATIONS=true');
}
const { table, schema, operation, verbose } = args;
const op = operation || 'vacuum_analyze';
let sql = '';
const verboseClause = verbose ? 'VERBOSE' : '';
if (table) {
const targetSchema = schema || this.db.getSchema();
const tableRef = `${this.db.safeIdentifier(targetSchema, 'schema')}.${this.db.safeIdentifier(table, 'table')}`;
switch (op) {
case 'vacuum':
sql = `VACUUM ${verboseClause} ${tableRef}`;
break;
case 'analyze':
sql = `ANALYZE ${verboseClause} ${tableRef}`;
break;
case 'vacuum_analyze':
sql = `VACUUM (ANALYZE${verbose ? ', VERBOSE' : ''}) ${tableRef}`;
break;
}
} else {
switch (op) {
case 'vacuum':
sql = `VACUUM ${verboseClause}`;
break;
case 'analyze':
sql = `ANALYZE ${verboseClause}`;
break;
case 'vacuum_analyze':
sql = `VACUUM (ANALYZE${verbose ? ', VERBOSE' : ''})`;
break;
}
}
await this.db.executeQuery(sql);
return {
content: [{
type: 'text',
text: `${op.replace('_', ' ').toUpperCase()} completed successfully${table ? ` on ${table}` : ''}`
}]
};
}
default:
throw new Error(`Unknown tool: ${name}`);
}
} catch (error) {
logger.error('Tool execution failed', error);
return {
content: [{
type: 'text',
text: `Error: ${error instanceof Error ? error.message : 'Unknown error'}`
}],
isError: true
};
}
}
private async handleResourceRead(uri: string): Promise<any> {
try {
logger.info('Resource read', { uri });
if (uri === 'pg://database/schema') {
const tables = await this.db.getTableList();
const schemas = await Promise.all(
tables.map(async (table) => ({
table,
schema: await this.db.getTableSchema(table)
}))
);
return {
contents: [{
uri,
mimeType: 'application/json',
text: JSON.stringify(schemas, null, 2)
}]
};
}
if (uri === 'pg://database/info') {
const info = await this.db.getDatabaseInfo();
return {
contents: [{
uri,
mimeType: 'application/json',
text: JSON.stringify(info, null, 2)
}]
};
}
if (uri.startsWith('pg://table/')) {
const fullName = uri.replace('pg://table/', '');
const [schemaName, tableName] = fullName.includes('.') ? fullName.split('.') : [this.db.getSchema(), fullName];
const schema = await this.db.getTableSchema(tableName, schemaName);
return {
contents: [{
uri,
mimeType: 'application/json',
text: JSON.stringify({ table: tableName, schema: schemaName, columns: schema }, null, 2)
}]
};
}
throw new Error(`Unknown resource: ${uri}`);
} catch (error) {
logger.error('Resource read failed', error);
throw error;
}
}
async start(): Promise<void> {
logger.info('Starting MCP server...');
// Connect to database
await this.db.connect();
// Start MCP server with stdio transport
const transport = new StdioServerTransport();
await this.server.connect(transport);
logger.info('MCP server started successfully');
// Handle shutdown
process.on('SIGINT', async () => {
logger.info('Shutting down MCP server...');
await this.db.disconnect();
process.exit(0);
});
process.on('SIGTERM', async () => {
logger.info('Shutting down MCP server...');
await this.db.disconnect();
process.exit(0);
});
}
}