/**
* MCP Server implementation with PostgreSQL tools
*/
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ListToolsRequestSchema,
Tool,
} from '@modelcontextprotocol/sdk/types.js';
import pg from 'pg';
import { Actor } from 'apify';
import {
DatabaseConfig,
QueryToolParams,
ListTablesToolParams,
DescribeTableToolParams,
GetTableSampleToolParams,
QueryLogEntry,
} from './types.js';
import {
executeQuery,
validateSchema,
escapeIdentifier,
buildSchemaFilter,
} from './database.js';
/**
* Create and configure the MCP server
*/
export function createMCPServer(pool: pg.Pool, config: DatabaseConfig): Server {
const server = new Server(
{
name: 'postgresql-mcp-server',
version: '1.0.0',
},
{
capabilities: {
tools: {},
},
}
);
// Define available tools
const tools: Tool[] = [
{
name: 'query',
description: 'Execute a SQL query on the PostgreSQL database. Supports SELECT statements and returns results as JSON. In read-only mode, only SELECT and EXPLAIN queries are allowed.',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'The SQL query to execute',
},
},
required: ['query'],
},
},
{
name: 'list_tables',
description: 'List all tables in the database. You can optionally filter by schema. Returns table names with metadata including estimated row counts.',
inputSchema: {
type: 'object',
properties: {
schema: {
type: 'string',
description: 'Optional: filter tables by schema name. If not provided, lists tables from all allowed schemas.',
},
},
},
},
{
name: 'describe_table',
description: 'Get detailed schema information for a specific table including columns, data types, constraints, indexes, and foreign key relationships.',
inputSchema: {
type: 'object',
properties: {
schema: {
type: 'string',
description: 'The schema name containing the table',
},
table: {
type: 'string',
description: 'The table name to describe',
},
},
required: ['schema', 'table'],
},
},
{
name: 'get_table_sample',
description: 'Retrieve sample rows from a table. Returns a limited number of rows to preview the table contents.',
inputSchema: {
type: 'object',
properties: {
schema: {
type: 'string',
description: 'The schema name containing the table',
},
table: {
type: 'string',
description: 'The table name to sample',
},
limit: {
type: 'number',
description: 'Number of rows to return (default: 10, max: maxQueryResults from config)',
default: 10,
},
},
required: ['schema', 'table'],
},
},
];
// Handle list_tools request
server.setRequestHandler(ListToolsRequestSchema, async () => {
return { tools };
});
// Handle call_tool request
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const startTime = Date.now();
const toolName = request.params.name;
const args = request.params.arguments || {};
try {
let result: any;
let queryExecuted: string | undefined;
let rowsReturned: number | undefined;
switch (toolName) {
case 'query':
result = await handleQuery(pool, config, args as unknown as QueryToolParams);
queryExecuted = (args as unknown as QueryToolParams).query;
rowsReturned = result.rowCount;
break;
case 'list_tables':
result = await handleListTables(pool, config, args as unknown as ListTablesToolParams);
rowsReturned = result.length;
break;
case 'describe_table':
result = await handleDescribeTable(pool, config, args as unknown as DescribeTableToolParams);
rowsReturned = result.columns.length;
break;
case 'get_table_sample':
result = await handleGetTableSample(pool, config, args as unknown as GetTableSampleToolParams);
rowsReturned = result.length;
break;
default:
throw new Error(`Unknown tool: ${toolName}`);
}
const executionTime = Date.now() - startTime;
// Log to Apify dataset
await logToDataset({
tool: toolName,
query: queryExecuted,
parameters: queryExecuted ? undefined : args,
timestamp: new Date().toISOString(),
executionTime,
rowsReturned,
success: true,
});
// Return result in MCP format
return {
content: [
{
type: 'text',
text: JSON.stringify(result, null, 2),
},
],
};
} catch (error) {
const executionTime = Date.now() - startTime;
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
// Log error to Apify dataset
await logToDataset({
tool: toolName,
query: toolName === 'query' ? (args as unknown as QueryToolParams).query : undefined,
parameters: toolName !== 'query' ? args : undefined,
timestamp: new Date().toISOString(),
executionTime,
success: false,
error: errorMessage,
});
// Return error in MCP format
return {
content: [
{
type: 'text',
text: `Error: ${errorMessage}`,
},
],
isError: true,
};
}
});
return server;
}
/**
* Handle the query tool
*/
async function handleQuery(
pool: pg.Pool,
config: DatabaseConfig,
params: QueryToolParams
): Promise<any> {
const { query } = params;
if (!query || typeof query !== 'string') {
throw new Error('Query parameter is required and must be a string');
}
const result = await executeQuery(pool, query, config);
return {
rowCount: result.rowCount,
rows: result.rows,
fields: result.fields,
};
}
/**
* Handle the list_tables tool
*/
async function handleListTables(
pool: pg.Pool,
config: DatabaseConfig,
params: ListTablesToolParams
): Promise<any[]> {
const { schema } = params;
// Validate schema if provided
if (schema) {
validateSchema(schema, config.allowedSchemas);
}
// Build query to list tables
const schemaFilter = schema
? `table_schema = '${schema.replace(/'/g, "''")}'`
: buildSchemaFilter(config.allowedSchemas);
const query = `
SELECT
table_schema as schema,
table_name as "tableName",
pg_relation_size(quote_ident(table_schema) || '.' || quote_ident(table_name)) as "tableSize"
FROM information_schema.tables
WHERE ${schemaFilter}
AND table_type = 'BASE TABLE'
ORDER BY table_schema, table_name
`;
const result = await executeQuery(pool, query, config);
// Get row counts for each table
const tablesWithCounts = await Promise.all(
result.rows.map(async (table) => {
try {
const countQuery = `
SELECT COUNT(*) as count
FROM ${escapeIdentifier(table.schema)}.${escapeIdentifier(table.tableName)}
`;
const countResult = await executeQuery(pool, countQuery, config);
return {
schema: table.schema,
tableName: table.tableName,
rowCount: parseInt(countResult.rows[0].count, 10),
tableSize: formatBytes(table.tableSize),
};
} catch (error) {
// If count fails, return table without count
return {
schema: table.schema,
tableName: table.tableName,
rowCount: null,
tableSize: formatBytes(table.tableSize),
};
}
})
);
return tablesWithCounts;
}
/**
* Handle the describe_table tool
*/
async function handleDescribeTable(
pool: pg.Pool,
config: DatabaseConfig,
params: DescribeTableToolParams
): Promise<any> {
const { schema, table } = params;
if (!schema || !table) {
throw new Error('Both schema and table parameters are required');
}
// Validate schema
validateSchema(schema, config.allowedSchemas);
// Get column information
const columnsQuery = `
SELECT
column_name as "columnName",
data_type as "dataType",
is_nullable as "isNullable",
column_default as "columnDefault",
character_maximum_length as "characterMaximumLength",
numeric_precision as "numericPrecision",
numeric_scale as "numericScale"
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position
`;
const columnsResult = await pool.query(columnsQuery, [schema, table]);
// Get constraint information
const constraintsQuery = `
SELECT
tc.constraint_name as "constraintName",
tc.constraint_type as "constraintType",
kcu.column_name as "columnName"
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.table_schema = $1 AND tc.table_name = $2
ORDER BY tc.constraint_type, kcu.ordinal_position
`;
const constraintsResult = await pool.query(constraintsQuery, [schema, table]);
// Get index information
const indexesQuery = `
SELECT
i.indexname as "indexName",
a.attname as "columnName",
ix.indisunique as "isUnique",
ix.indisprimary as "isPrimary"
FROM pg_indexes i
JOIN pg_class c ON c.relname = i.indexname
JOIN pg_index ix ON ix.indexrelid = c.oid
JOIN pg_attribute a ON a.attrelid = ix.indrelid AND a.attnum = ANY(ix.indkey)
WHERE i.schemaname = $1 AND i.tablename = $2
ORDER BY i.indexname
`;
const indexesResult = await pool.query(indexesQuery, [schema, table]);
// Get foreign key information
const foreignKeysQuery = `
SELECT
tc.constraint_name as "constraintName",
kcu.column_name as "columnName",
ccu.table_schema as "foreignTableSchema",
ccu.table_name as "foreignTableName",
ccu.column_name as "foreignColumnName"
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = $1
AND tc.table_name = $2
`;
const foreignKeysResult = await pool.query(foreignKeysQuery, [schema, table]);
return {
schema,
tableName: table,
columns: columnsResult.rows.map(col => ({
columnName: col.columnName,
dataType: col.dataType,
isNullable: col.isNullable === 'YES',
columnDefault: col.columnDefault,
characterMaximumLength: col.characterMaximumLength,
numericPrecision: col.numericPrecision,
numericScale: col.numericScale,
})),
constraints: constraintsResult.rows,
indexes: indexesResult.rows,
foreignKeys: foreignKeysResult.rows,
};
}
/**
* Handle the get_table_sample tool
*/
async function handleGetTableSample(
pool: pg.Pool,
config: DatabaseConfig,
params: GetTableSampleToolParams
): Promise<any[]> {
const { schema, table, limit = 10 } = params;
if (!schema || !table) {
throw new Error('Both schema and table parameters are required');
}
// Validate schema
validateSchema(schema, config.allowedSchemas);
// Validate limit
const actualLimit = Math.min(limit, config.maxQueryResults);
// Query for sample data
const query = `
SELECT *
FROM ${escapeIdentifier(schema)}.${escapeIdentifier(table)}
LIMIT ${actualLimit}
`;
const result = await executeQuery(pool, query, config);
return result.rows;
}
/**
* Log query execution to Apify dataset
*/
async function logToDataset(entry: QueryLogEntry): Promise<void> {
try {
await Actor.pushData(entry);
} catch (error) {
// Log error but don't fail the operation
console.error('Failed to log to dataset:', error);
}
}
/**
* Format bytes to human-readable string
*/
function formatBytes(bytes: number | null): string {
if (bytes === null || bytes === undefined) {
return 'Unknown';
}
if (bytes === 0) return '0 Bytes';
const k = 1024;
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB'];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return Math.round((bytes / Math.pow(k, i)) * 100) / 100 + ' ' + sizes[i];
}
/**
* Start the MCP server with stdio transport
*/
export async function startMCPServer(pool: pg.Pool, config: DatabaseConfig): Promise<void> {
const server = createMCPServer(pool, config);
const transport = new StdioServerTransport();
await server.connect(transport);
console.error('PostgreSQL MCP Server running on stdio');
}