/**
* Database namespace - Stateful SQL with dynamic connection tools
*/
import { v4 as uuidv4 } from 'uuid';
import { MCPServer } from '../core/server.js';
import { MCPTool } from '../types/core.js';
import { InvalidArgError, SessionNotFoundError, ConfigMissingError } from '../core/errors.js';
import {
DatabaseConnection,
DatabaseConnectionOptions,
TransactionCommand,
RowMode,
ExecResponse,
QueryResponse,
SchemaIntrospectionResponse,
StartConnectionResponse
} from '../types/db.js';
// Import database drivers
import pg from 'pg';
import mysql from 'mysql2/promise';
import sqlite3 from 'sqlite3';
import tedious from 'tedious';
export class DbNamespace {
private mcpServer: MCPServer;
private connections = new Map<string, DatabaseConnection>();
constructor(mcpServer: MCPServer) {
this.mcpServer = mcpServer;
this.registerStaticTools();
}
private registerStaticTools(): void {
const registry = this.mcpServer.getRegistry();
registry.registerTool(
'db.start_connection',
{
name: 'db.start_connection',
description: 'Start a new database connection',
inputSchema: {
type: 'object',
properties: {
opts: {
type: 'object',
properties: {
driver: {
type: 'string',
enum: ['postgres', 'mysql', 'sqlite', 'mssql']
},
dsn: { type: 'string' }
},
required: ['driver', 'dsn']
}
},
required: ['opts']
}
},
this.startConnection.bind(this)
);
registry.registerTool(
'db.end_connection',
{
name: 'db.end_connection',
description: 'End a database connection',
inputSchema: {
type: 'object',
properties: {
connection_id: { type: 'string' }
},
required: ['connection_id']
}
},
this.endConnection.bind(this)
);
}
private async startConnection(params: {
opts: DatabaseConnectionOptions;
}): Promise<StartConnectionResponse> {
const connectionId = uuidv4();
const { driver, dsn } = params.opts;
let dbConnection: any;
try {
switch (driver) {
case 'postgres':
dbConnection = await this.createPostgresConnection(dsn);
break;
case 'mysql':
dbConnection = await this.createMysqlConnection(dsn);
break;
case 'sqlite':
dbConnection = await this.createSqliteConnection(dsn);
break;
case 'mssql':
dbConnection = await this.createMssqlConnection(dsn);
break;
default:
throw new InvalidArgError('driver', `Unsupported driver: ${driver}`);
}
const connection: DatabaseConnection = {
connection_id: connectionId,
driver,
connection: dbConnection,
created_at: new Date().toISOString(),
last_used: new Date().toISOString(),
tx_state: 'closed'
};
this.connections.set(connectionId, connection);
// Register this connection with the registry
this.mcpServer.getRegistry().createSession(connectionId, 'db');
// Register dynamic connection tools
this.registerConnectionTools(connectionId);
return { connection_id: connectionId };
} catch (error) {
throw new Error(`Failed to connect to database: ${error}`);
}
}
private async createPostgresConnection(dsn: string): Promise<any> {
const client = new pg.Client(dsn);
await client.connect();
return client;
}
private async createMysqlConnection(dsn: string): Promise<any> {
return await mysql.createConnection(dsn);
}
private async createSqliteConnection(dsn: string): Promise<any> {
return new Promise((resolve, reject) => {
const db = new sqlite3.Database(dsn, (err) => {
if (err) reject(err);
else resolve(db);
});
});
}
private async createMssqlConnection(dsn: string): Promise<any> {
// Parse DSN for SQL Server
const url = new URL(dsn);
const config: any = {
server: url.hostname,
authentication: {
type: 'default',
options: {
userName: url.username,
password: url.password
}
},
options: {
database: url.pathname.substring(1),
port: parseInt(url.port || '1433'),
encrypt: true,
trustServerCertificate: true
}
};
return new Promise((resolve, reject) => {
const connection = new tedious.Connection(config);
connection.on('connect', (err) => {
if (err) reject(err);
else resolve(connection);
});
connection.connect();
});
}
private registerConnectionTools(connectionId: string): void {
const registry = this.mcpServer.getRegistry();
registry.registerSessionTool(
connectionId,
`db.${connectionId}.exec`,
{
name: `db.${connectionId}.exec`,
description: 'Execute SQL statement',
inputSchema: {
type: 'object',
properties: {
sql: { type: 'string' },
params: { type: 'array' },
tx: { type: 'string', enum: ['none', 'begin', 'commit', 'rollback'] }
},
required: ['sql']
}
},
(params: any) => this.exec(connectionId, params)
);
registry.registerSessionTool(
connectionId,
`db.${connectionId}.query`,
{
name: `db.${connectionId}.query`,
description: 'Query database and return rows',
inputSchema: {
type: 'object',
properties: {
sql: { type: 'string' },
params: { type: 'array' },
row_mode: { type: 'string', enum: ['dict', 'array'] }
},
required: ['sql']
}
},
(params: any) => this.query(connectionId, params)
);
registry.registerSessionTool(
connectionId,
`db.${connectionId}.schema_introspect`,
{
name: `db.${connectionId}.schema_introspect`,
description: 'Introspect database schema',
inputSchema: {
type: 'object',
properties: {}
}
},
() => this.schemaIntrospect(connectionId)
);
}
private async endConnection(params: {
connection_id: string;
}): Promise<{ ok: true }> {
const connection = this.connections.get(params.connection_id);
if (!connection) {
throw new SessionNotFoundError(params.connection_id);
}
try {
// Close the database connection
switch (connection.driver) {
case 'postgres':
await connection.connection.end();
break;
case 'mysql':
await connection.connection.end();
break;
case 'sqlite':
connection.connection.close();
break;
case 'mssql':
connection.connection.close();
break;
}
} catch (error) {
console.warn(`Error closing database connection ${params.connection_id}:`, error);
}
this.connections.delete(params.connection_id);
this.mcpServer.getRegistry().destroySession(params.connection_id);
return { ok: true };
}
private async exec(connectionId: string, params: {
sql: string;
params?: any[];
tx?: TransactionCommand;
}): Promise<ExecResponse> {
const connection = this.getConnection(connectionId);
// Handle transaction commands
if (params.tx === 'begin') {
await this.beginTransaction(connection);
connection.tx_state = 'open';
return { status: 'ok', tx_state: 'open' };
}
if (params.tx === 'commit') {
await this.commitTransaction(connection);
connection.tx_state = 'closed';
return { status: 'ok', tx_state: 'closed' };
}
if (params.tx === 'rollback') {
await this.rollbackTransaction(connection);
connection.tx_state = 'closed';
return { status: 'ok', tx_state: 'closed' };
}
// Execute the SQL
const result = await this.executeSQL(connection, params.sql, params.params);
return {
status: 'ok',
rows_affected: result.rowsAffected,
tx_state: connection.tx_state
};
}
private async query(connectionId: string, params: {
sql: string;
params?: any[];
row_mode?: RowMode;
}): Promise<QueryResponse> {
const connection = this.getConnection(connectionId);
const rowMode = params.row_mode || 'dict';
const result = await this.executeQuery(connection, params.sql, params.params);
if (rowMode === 'array') {
return {
cols: result.columns,
rows: result.rows.map((row: any) => Object.values(row))
};
} else {
return {
cols: result.columns,
rows: result.rows
};
}
}
private async schemaIntrospect(connectionId: string): Promise<SchemaIntrospectionResponse> {
const connection = this.getConnection(connectionId);
let schemas: any[] = [];
switch (connection.driver) {
case 'postgres':
schemas = await this.introspectPostgres(connection.connection);
break;
case 'mysql':
schemas = await this.introspectMysql(connection.connection);
break;
case 'sqlite':
schemas = await this.introspectSqlite(connection.connection);
break;
case 'mssql':
schemas = await this.introspectMssql(connection.connection);
break;
}
return { schemas };
}
private async introspectPostgres(client: any): Promise<any[]> {
const schemasQuery = `
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('pg_catalog', 'information_schema')
`;
const schemasResult = await client.query(schemasQuery);
const schemas = [];
for (const schemaRow of schemasResult.rows) {
const tablesQuery = `
SELECT table_name, column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = $1
ORDER BY table_name, ordinal_position
`;
const tablesResult = await client.query(tablesQuery, [schemaRow.schema_name]);
const tableMap = new Map();
for (const row of tablesResult.rows) {
if (!tableMap.has(row.table_name)) {
tableMap.set(row.table_name, {
name: row.table_name,
columns: []
});
}
tableMap.get(row.table_name).columns.push({
name: row.column_name,
type: row.data_type,
nullable: row.is_nullable === 'YES',
default: row.column_default
});
}
schemas.push({
name: schemaRow.schema_name,
tables: Array.from(tableMap.values())
});
}
return schemas;
}
private async introspectMysql(connection: any): Promise<any[]> {
const [rows] = await connection.query(`
SELECT table_schema, table_name, column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys')
ORDER BY table_schema, table_name, ordinal_position
`);
const schemaMap = new Map();
for (const row of rows) {
if (!schemaMap.has(row.table_schema)) {
schemaMap.set(row.table_schema, new Map());
}
const tableMap = schemaMap.get(row.table_schema);
if (!tableMap.has(row.table_name)) {
tableMap.set(row.table_name, {
name: row.table_name,
columns: []
});
}
tableMap.get(row.table_name).columns.push({
name: row.column_name,
type: row.data_type,
nullable: row.is_nullable === 'YES',
default: row.column_default
});
}
const schemas = [];
for (const [schemaName, tables] of schemaMap) {
schemas.push({
name: schemaName,
tables: Array.from(tables.values())
});
}
return schemas;
}
private async introspectSqlite(db: any): Promise<any[]> {
return new Promise((resolve) => {
const tables: any[] = [];
db.all("SELECT name FROM sqlite_master WHERE type='table'", (err: any, tableRows: any[]) => {
if (err) {
resolve([{ name: 'main', tables: [] }]);
return;
}
let completed = 0;
for (const tableRow of tableRows) {
db.all(`PRAGMA table_info(${tableRow.name})`, (err: any, columns: any[]) => {
tables.push({
name: tableRow.name,
columns: columns.map((col: any) => ({
name: col.name,
type: col.type,
nullable: col.notnull === 0,
default: col.dflt_value
}))
});
completed++;
if (completed === tableRows.length) {
resolve([{ name: 'main', tables }]);
}
});
}
if (tableRows.length === 0) {
resolve([{ name: 'main', tables: [] }]);
}
});
});
}
private async introspectMssql(connection: any): Promise<any[]> {
// MSSQL introspection via system tables
return [{ name: 'dbo', tables: [] }];
}
private async beginTransaction(connection: DatabaseConnection): Promise<void> {
switch (connection.driver) {
case 'postgres':
await connection.connection.query('BEGIN');
break;
case 'mysql':
await connection.connection.beginTransaction();
break;
case 'sqlite':
connection.connection.run('BEGIN TRANSACTION');
break;
case 'mssql':
// MSSQL transaction handling would go here
break;
}
}
private async commitTransaction(connection: DatabaseConnection): Promise<void> {
switch (connection.driver) {
case 'postgres':
await connection.connection.query('COMMIT');
break;
case 'mysql':
await connection.connection.commit();
break;
case 'sqlite':
connection.connection.run('COMMIT');
break;
case 'mssql':
// MSSQL transaction handling would go here
break;
}
}
private async rollbackTransaction(connection: DatabaseConnection): Promise<void> {
switch (connection.driver) {
case 'postgres':
await connection.connection.query('ROLLBACK');
break;
case 'mysql':
await connection.connection.rollback();
break;
case 'sqlite':
connection.connection.run('ROLLBACK');
break;
case 'mssql':
// MSSQL transaction handling would go here
break;
}
}
private async executeSQL(connection: DatabaseConnection, sql: string, params?: any[]): Promise<any> {
switch (connection.driver) {
case 'postgres':
const pgResult = await connection.connection.query(sql, params);
return { rowsAffected: pgResult.rowCount };
case 'mysql':
const [mysqlResult] = await connection.connection.execute(sql, params);
return { rowsAffected: (mysqlResult as any).affectedRows };
case 'sqlite':
return new Promise((resolve, reject) => {
connection.connection.run(sql, params, function(this: any, err: any) {
if (err) reject(err);
else resolve({ rowsAffected: this.changes });
});
});
case 'mssql':
// MSSQL execution would go here
return { rowsAffected: 0 };
default:
throw new Error(`Unsupported driver: ${connection.driver}`);
}
}
private async executeQuery(connection: DatabaseConnection, sql: string, params?: any[]): Promise<any> {
switch (connection.driver) {
case 'postgres':
const pgResult = await connection.connection.query(sql, params);
return {
columns: pgResult.fields.map((f: any) => f.name),
rows: pgResult.rows
};
case 'mysql':
const [rows, fields] = await connection.connection.execute(sql, params);
return {
columns: fields.map((f: any) => f.name),
rows
};
case 'sqlite':
return new Promise((resolve, reject) => {
connection.connection.all(sql, params, (err: any, rows: any[]) => {
if (err) reject(err);
else {
const columns = rows.length > 0 ? Object.keys(rows[0]) : [];
resolve({ columns, rows });
}
});
});
case 'mssql':
// MSSQL query would go here
return { columns: [], rows: [] };
default:
throw new Error(`Unsupported driver: ${connection.driver}`);
}
}
private getConnection(connectionId: string): DatabaseConnection {
const connection = this.connections.get(connectionId);
if (!connection) {
throw new SessionNotFoundError(connectionId);
}
connection.last_used = new Date().toISOString();
return connection;
}
// Cleanup method to be called on server shutdown
async cleanup(): Promise<void> {
const connectionIds = Array.from(this.connections.keys());
for (const connectionId of connectionIds) {
try {
await this.endConnection({ connection_id: connectionId });
} catch (error) {
console.warn(`Error cleaning up database connection ${connectionId}:`, error);
}
}
}
}