import { Pool, PoolClient, QueryResult, FieldDef } from 'pg';
import * as fs from 'fs';
import { logger } from './logger.js';
export interface DatabaseConfig {
host: string;
port: number;
user: string;
password: string;
database: string;
schema?: string;
connectionLimit?: number;
queryTimeout?: number;
maxResults?: number;
allowWrite?: boolean;
ssl?: {
mode?: 'disable' | 'require' | 'verify-ca' | 'verify-full';
rejectUnauthorized?: boolean;
caPath?: string;
certPath?: string;
keyPath?: string;
minVersion?: string;
};
rateLimit?: {
maxQueriesPerMinute?: number;
maxQueriesPerHour?: number;
maxConcurrentQueries?: number;
};
}
// Simple rate limiter implementation
class RateLimiter {
private queryTimestamps: number[] = [];
private currentConcurrent: number = 0;
private readonly maxPerMinute: number;
private readonly maxPerHour: number;
private readonly maxConcurrent: number;
constructor(config: { maxPerMinute: number; maxPerHour: number; maxConcurrent: number }) {
this.maxPerMinute = config.maxPerMinute;
this.maxPerHour = config.maxPerHour;
this.maxConcurrent = config.maxConcurrent;
}
async checkLimit(): Promise<void> {
const now = Date.now();
const oneMinuteAgo = now - 60 * 1000;
const oneHourAgo = now - 60 * 60 * 1000;
// Clean old timestamps
this.queryTimestamps = this.queryTimestamps.filter(ts => ts > oneHourAgo);
// Count queries in time windows
const queriesInLastMinute = this.queryTimestamps.filter(ts => ts > oneMinuteAgo).length;
const queriesInLastHour = this.queryTimestamps.length;
// Check rate limits
if (queriesInLastMinute >= this.maxPerMinute) {
throw new Error(`Rate limit exceeded: ${this.maxPerMinute} queries per minute`);
}
if (queriesInLastHour >= this.maxPerHour) {
throw new Error(`Rate limit exceeded: ${this.maxPerHour} queries per hour`);
}
if (this.currentConcurrent >= this.maxConcurrent) {
throw new Error(`Concurrent query limit exceeded: ${this.maxConcurrent} concurrent queries`);
}
// Record this query
this.queryTimestamps.push(now);
this.currentConcurrent++;
}
releaseQuery(): void {
this.currentConcurrent = Math.max(0, this.currentConcurrent - 1);
}
getStats() {
const now = Date.now();
const oneMinuteAgo = now - 60 * 1000;
const queriesInLastMinute = this.queryTimestamps.filter(ts => ts > oneMinuteAgo).length;
return {
queriesInLastMinute,
queriesInLastHour: this.queryTimestamps.length,
currentConcurrent: this.currentConcurrent
};
}
}
export interface PostgresQueryResult {
rows: any[];
fields: FieldDef[];
rowCount: number | null;
command: string;
}
export class DatabaseManager {
private pool: Pool | null = null;
private config: DatabaseConfig;
private isConnected: boolean = false;
private rateLimiter: RateLimiter | null = null;
private dbVersion: string = '';
// Security: Tables that should never be accessible
private readonly BLOCKED_TABLES = [
'pg_catalog.pg_authid',
'pg_catalog.pg_shadow',
'pg_catalog.pg_auth_members',
'pg_catalog.pg_file_settings',
'pg_catalog.pg_hba_file_rules',
'pg_catalog.pg_ident_file_mappings',
'pg_catalog.pg_subscription',
'pg_catalog.pg_replication_origin',
'pg_catalog.pg_replication_slots',
'pg_catalog.pg_largeobject',
'pg_catalog.pg_largeobject_metadata',
'pg_catalog.pg_shdepend',
'pg_catalog.pg_shseclabel',
];
// Security: Patterns that might indicate sensitive tables
private readonly SENSITIVE_PATTERNS = [
/password/i,
/passwd/i,
/token/i,
/secret/i,
/api_key/i,
/apikey/i,
/private_key/i,
/credit_card/i,
/card_number/i,
/cvv/i,
/ssn/i,
/social_security/i,
/oauth/i,
/refresh_token/i,
/access_token/i,
/auth_token/i,
/encryption_key/i,
/rolpassword/i,
];
// Security: Valid identifier pattern (PostgreSQL: 63 char limit)
private readonly VALID_IDENTIFIER = /^[a-zA-Z_][a-zA-Z0-9_$]{0,62}$/;
// Validate and escape PostgreSQL identifiers (table/column names)
validateIdentifier(identifier: string, type: 'table' | 'column' | 'schema' = 'table'): string {
if (!identifier || identifier.length === 0) {
throw new Error(`Invalid ${type} name: cannot be empty`);
}
if (identifier.length > 63) {
throw new Error(`Invalid ${type} name: exceeds 63 character limit`);
}
if (!this.VALID_IDENTIFIER.test(identifier)) {
throw new Error(`Invalid ${type} name: contains invalid characters or starts with a number`);
}
return identifier;
}
// Escape identifier for use in SQL (PostgreSQL uses double quotes)
escapeIdentifier(identifier: string): string {
// After validation, escape by doubling double quotes
return '"' + identifier.replace(/"/g, '""') + '"';
}
// Safe identifier helper that validates and escapes in one call
safeIdentifier(identifier: string, type: 'table' | 'column' | 'schema' = 'table'): string {
return this.escapeIdentifier(this.validateIdentifier(identifier, type));
}
// Escape string values for SQL (doubles quotes)
escapeStringValue(value: string): string {
return value.replace(/'/g, "''");
}
// Build SSL configuration from config
private buildSslConfig(): false | object {
if (!this.config.ssl || this.config.ssl.mode === 'disable') {
return false;
}
const sslConfig: any = {
rejectUnauthorized: this.config.ssl.rejectUnauthorized !== false,
};
if (this.config.ssl.caPath) {
try {
sslConfig.ca = fs.readFileSync(this.config.ssl.caPath).toString();
} catch (err) {
logger.warn('Failed to read SSL CA certificate', { path: this.config.ssl.caPath });
}
}
if (this.config.ssl.certPath) {
try {
sslConfig.cert = fs.readFileSync(this.config.ssl.certPath).toString();
} catch (err) {
logger.warn('Failed to read SSL client certificate', { path: this.config.ssl.certPath });
}
}
if (this.config.ssl.keyPath) {
try {
sslConfig.key = fs.readFileSync(this.config.ssl.keyPath).toString();
} catch (err) {
logger.warn('Failed to read SSL client key', { path: this.config.ssl.keyPath });
}
}
if (this.config.ssl.minVersion) {
sslConfig.minVersion = this.config.ssl.minVersion;
}
return sslConfig;
}
constructor(config: DatabaseConfig) {
this.config = {
connectionLimit: 10,
queryTimeout: 30000,
maxResults: 1000,
allowWrite: false,
schema: 'public',
rateLimit: {
maxQueriesPerMinute: 60,
maxQueriesPerHour: 1000,
maxConcurrentQueries: 10
},
...config
};
// Merge rate limit config
if (config.rateLimit) {
this.config.rateLimit = {
...this.config.rateLimit,
...config.rateLimit
};
}
// Initialize rate limiter
this.rateLimiter = new RateLimiter({
maxPerMinute: this.config.rateLimit!.maxQueriesPerMinute!,
maxPerHour: this.config.rateLimit!.maxQueriesPerHour!,
maxConcurrent: this.config.rateLimit!.maxConcurrentQueries!
});
logger.info('Database manager initialized', {
host: this.config.host,
port: this.config.port,
database: this.config.database,
schema: this.config.schema,
user: this.config.user,
allowWrite: this.config.allowWrite,
ssl: this.config.ssl?.mode || 'disabled',
rateLimit: this.config.rateLimit
});
}
async connect(): Promise<void> {
try {
logger.info('Connecting to PostgreSQL database...');
this.pool = new Pool({
host: this.config.host,
port: this.config.port,
user: this.config.user,
password: this.config.password,
database: this.config.database,
max: this.config.connectionLimit,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
ssl: this.buildSslConfig(),
application_name: 'mav-postgresql-mcp-server',
});
// Test connection and get version
const client = await this.pool.connect();
try {
// Set search_path for the connection
await client.query(`SET search_path TO ${this.safeIdentifier(this.config.schema || 'public', 'schema')}, public`);
// Get version info
const versionResult = await client.query('SELECT version()');
this.dbVersion = versionResult.rows[0]?.version || '';
} finally {
client.release();
}
this.isConnected = true;
logger.info('Successfully connected to PostgreSQL database', {
version: this.dbVersion.split(' ').slice(0, 2).join(' ')
});
} catch (error) {
logger.error('Failed to connect to PostgreSQL database', error);
throw new Error(`Database connection failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
}
}
async disconnect(): Promise<void> {
if (this.pool) {
logger.info('Closing database connection pool...');
await this.pool.end();
this.pool = null;
this.isConnected = false;
logger.info('Database connection pool closed');
}
}
private validateQuery(query: string): void {
const normalizedQuery = query.toLowerCase();
// Security: Check for blocked tables
for (const blockedTable of this.BLOCKED_TABLES) {
if (normalizedQuery.includes(blockedTable.toLowerCase())) {
logger.warn('Attempted access to blocked table', { table: blockedTable });
throw new Error(`Access to table ${blockedTable} is not allowed`);
}
}
// Security: Check for sensitive table patterns
for (const pattern of this.SENSITIVE_PATTERNS) {
if (pattern.test(normalizedQuery)) {
logger.warn('Query contains sensitive pattern', { pattern: pattern.toString() });
// Log warning but don't block - let admin decide
}
}
// Security: Check for file operations
const filePatterns = /(COPY\s+.*\s+(TO|FROM)\s+['"]|pg_read_file|pg_read_binary_file|pg_stat_file|pg_ls_dir|lo_import|lo_export|lo_from_bytea|lo_put|lo_get)/i;
if (filePatterns.test(query)) {
// Allow COPY TO STDOUT and COPY FROM STDIN
if (!/COPY\s+.*\s+TO\s+STDOUT/i.test(query) && !/COPY\s+.*\s+FROM\s+STDIN/i.test(query)) {
throw new Error('File system operations are not allowed');
}
}
// Security: Check for permission operations
const permissionPatterns = /(GRANT|REVOKE|CREATE\s+(USER|ROLE)|DROP\s+(USER|ROLE)|ALTER\s+(USER|ROLE)|SET\s+ROLE|SET\s+SESSION\s+AUTHORIZATION|REASSIGN\s+OWNED|DROP\s+OWNED|SECURITY\s+DEFINER)/i;
if (permissionPatterns.test(query)) {
logger.error('Attempted permission modification', { query: query.substring(0, 100) });
throw new Error('Permission/role modifications are not allowed');
}
// Security: Check for admin operations
const adminPatterns = /(CREATE\s+DATABASE|DROP\s+DATABASE|ALTER\s+DATABASE|CREATE\s+TABLESPACE|DROP\s+TABLESPACE|pg_terminate_backend|pg_cancel_backend|pg_reload_conf|pg_rotate_logfile|pg_switch_wal)/i;
if (adminPatterns.test(query)) {
throw new Error('Database administration operations are not allowed');
}
// Security: Check for network operations
const networkPatterns = /(dblink|dblink_connect|dblink_exec|dblink_send_query|CREATE\s+SERVER|ALTER\s+SERVER|DROP\s+SERVER|CREATE\s+USER\s+MAPPING|CREATE\s+FOREIGN\s+TABLE|http_get|http_post)/i;
if (networkPatterns.test(query)) {
throw new Error('External network operations are not allowed');
}
// Security: Check for extension operations
const extensionPatterns = /(CREATE\s+EXTENSION|DROP\s+EXTENSION|ALTER\s+EXTENSION|LOAD\s+['"][^'"]+['"])/i;
if (extensionPatterns.test(query)) {
throw new Error('Extension operations are not allowed');
}
// Check for write operations
const writePatterns = /^\s*(INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|TRUNCATE|CALL)\s/i;
if (!this.config.allowWrite) {
// Block write operations in read-only mode
if (writePatterns.test(query)) {
throw new Error('Write operations are not allowed in read-only mode');
}
} else {
// Log write operations when allowed
if (writePatterns.test(query)) {
logger.warn('WRITE OPERATION DETECTED', {
query: query.substring(0, 100),
type: 'write_operation'
});
logger.audit('WRITE_OPERATION', {
query: query.substring(0, 200),
user: this.config.user,
database: this.config.database
});
}
}
}
// Check if a query is a write operation
private isWriteOperation(query: string): boolean {
const writePatterns = /^\s*(INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|TRUNCATE|CALL|VACUUM)\s/i;
return writePatterns.test(query);
}
async executeQuery(query: string, params?: any[]): Promise<PostgresQueryResult> {
if (!this.pool || !this.isConnected) {
throw new Error('Database not connected');
}
// Check rate limit before executing
if (this.rateLimiter) {
try {
await this.rateLimiter.checkLimit();
} catch (error) {
logger.warn('Rate limit exceeded', {
error: error instanceof Error ? error.message : String(error),
stats: this.rateLimiter.getStats()
});
throw error;
}
}
const client = await this.pool.connect();
const isWrite = this.isWriteOperation(query);
try {
logger.debug('Executing query', { query, params, isWrite });
// Validate query (security checks)
this.validateQuery(query);
// Audit log the query
logger.audit('QUERY_EXECUTE', {
query: query.substring(0, 200),
user: this.config.user,
database: this.config.database,
schema: this.config.schema,
hasParams: !!params && params.length > 0,
isWrite
});
// Set statement timeout for this query
await client.query(`SET statement_timeout = ${this.config.queryTimeout}`);
// Set search_path
await client.query(`SET search_path TO ${this.safeIdentifier(this.config.schema || 'public', 'schema')}, public`);
let result: QueryResult;
if (!isWrite) {
// CRITICAL SECURITY: Wrap read queries in READ ONLY transaction
// This provides database-level enforcement, not just pattern matching
await client.query('BEGIN TRANSACTION READ ONLY');
try {
result = await client.query(query, params);
await client.query('ROLLBACK'); // Always rollback read-only transactions
} catch (error) {
await client.query('ROLLBACK');
throw error;
}
} else {
// Write operations - already validated by validateQuery()
result = await client.query(query, params);
}
// Reset timeout
await client.query('RESET statement_timeout');
// Limit results if needed
let rows = result.rows;
if (rows.length > this.config.maxResults!) {
logger.warn(`Query returned ${rows.length} rows, limiting to ${this.config.maxResults}`);
rows = rows.slice(0, this.config.maxResults);
}
logger.debug('Query executed successfully', { rowCount: result.rowCount });
return {
rows,
fields: result.fields,
rowCount: result.rowCount,
command: result.command
};
} catch (error: any) {
// Handle timeout specifically
if (error.code === '57014') {
logger.error('Query timeout', { query: query.substring(0, 100) });
throw new Error(`Query exceeded timeout of ${this.config.queryTimeout}ms`);
}
// Handle read-only transaction violation
if (error.code === '25006') {
logger.error('Write operation attempted in read-only mode', { query: query.substring(0, 100) });
throw new Error('Write operations are not allowed in read-only mode (enforced by PostgreSQL)');
}
logger.error('Query execution failed', error);
throw new Error(`Query failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
} finally {
client.release();
// Release the concurrent query count
if (this.rateLimiter) {
this.rateLimiter.releaseQuery();
}
}
}
async getTableList(schema?: string): Promise<string[]> {
const targetSchema = schema || this.config.schema || 'public';
const result = await this.executeQuery(
`SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = $1 ORDER BY tablename`,
[targetSchema]
);
return result.rows.map(row => row.tablename);
}
async getTableSchema(tableName: string, schema?: string): Promise<any[]> {
const targetSchema = schema || this.config.schema || 'public';
// Validate table name before using it
this.validateIdentifier(tableName, 'table');
const result = await this.executeQuery(`
SELECT
c.column_name AS "Field",
c.data_type ||
CASE
WHEN c.character_maximum_length IS NOT NULL
THEN '(' || c.character_maximum_length || ')'
WHEN c.numeric_precision IS NOT NULL AND c.numeric_scale IS NOT NULL
THEN '(' || c.numeric_precision || ',' || c.numeric_scale || ')'
ELSE ''
END AS "Type",
c.is_nullable AS "Null",
CASE
WHEN pk.column_name IS NOT NULL THEN 'PRI'
WHEN uk.column_name IS NOT NULL THEN 'UNI'
ELSE ''
END AS "Key",
c.column_default AS "Default",
CASE
WHEN c.column_default LIKE 'nextval%' THEN 'auto_increment'
WHEN c.is_identity = 'YES' THEN 'auto_increment'
ELSE ''
END AS "Extra"
FROM information_schema.columns c
LEFT JOIN (
SELECT ku.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage ku
ON tc.constraint_name = ku.constraint_name
AND tc.table_schema = ku.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = $1
AND tc.table_name = $2
) pk ON c.column_name = pk.column_name
LEFT JOIN (
SELECT ku.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage ku
ON tc.constraint_name = ku.constraint_name
AND tc.table_schema = ku.table_schema
WHERE tc.constraint_type = 'UNIQUE'
AND tc.table_schema = $1
AND tc.table_name = $2
) uk ON c.column_name = uk.column_name
WHERE c.table_schema = $1
AND c.table_name = $2
ORDER BY c.ordinal_position
`, [targetSchema, tableName]);
return result.rows;
}
async getDatabaseInfo(): Promise<any> {
const [tables, sizeResult, versionResult] = await Promise.all([
this.getTableList(),
this.executeQuery(`
SELECT pg_database_size(current_database()) / 1024 / 1024 AS size_mb
`),
this.executeQuery('SELECT version() AS version')
]);
return {
database: this.config.database,
schema: this.config.schema || 'public',
tables: tables.length,
size_mb: sizeResult.rows[0]?.size_mb || 0,
version: versionResult.rows[0]?.version || 'Unknown',
connection: {
host: this.config.host,
port: this.config.port,
user: this.config.user
}
};
}
getSchema(): string {
return this.config.schema || 'public';
}
isReady(): boolean {
return this.isConnected && this.pool !== null;
}
}
// Factory function to create database manager from environment
export function createDatabaseManager(): DatabaseManager {
const config: DatabaseConfig = {
host: process.env.PG_HOST || 'localhost',
port: parseInt(process.env.PG_PORT || '5432'),
user: process.env.PG_USER || 'postgres',
password: process.env.PG_PASSWORD || '',
database: process.env.PG_DATABASE || '',
schema: process.env.PG_SCHEMA || 'public',
connectionLimit: parseInt(process.env.CONNECTION_LIMIT || '10'),
queryTimeout: parseInt(process.env.QUERY_TIMEOUT || '30000'),
maxResults: parseInt(process.env.MAX_RESULTS || '1000'),
allowWrite: process.env.ALLOW_WRITE_OPERATIONS === 'true',
ssl: {
mode: (process.env.PG_SSL_MODE as any) || 'disable',
rejectUnauthorized: process.env.PG_SSL_REJECT_UNAUTHORIZED !== 'false',
caPath: process.env.PG_SSL_CA_PATH,
certPath: process.env.PG_SSL_CERT_PATH,
keyPath: process.env.PG_SSL_KEY_PATH,
minVersion: process.env.PG_SSL_MIN_VERSION,
},
rateLimit: {
maxQueriesPerMinute: parseInt(process.env.RATE_LIMIT_PER_MINUTE || '60'),
maxQueriesPerHour: parseInt(process.env.RATE_LIMIT_PER_HOUR || '1000'),
maxConcurrentQueries: parseInt(process.env.RATE_LIMIT_CONCURRENT || '10')
}
};
if (!config.database) {
throw new Error('PG_DATABASE environment variable is required');
}
return new DatabaseManager(config);
}