Skip to main content
Glama
db.ts17.7 kB
/** * Database namespace - Stateful SQL with dynamic connection tools */ import { v4 as uuidv4 } from 'uuid'; import { MCPServer } from '../core/server.js'; import { MCPTool } from '../types/core.js'; import { InvalidArgError, SessionNotFoundError, ConfigMissingError } from '../core/errors.js'; import { DatabaseConnection, DatabaseConnectionOptions, TransactionCommand, RowMode, ExecResponse, QueryResponse, SchemaIntrospectionResponse, StartConnectionResponse } from '../types/db.js'; // Import database drivers import pg from 'pg'; import mysql from 'mysql2/promise'; import sqlite3 from 'sqlite3'; import tedious from 'tedious'; export class DbNamespace { private mcpServer: MCPServer; private connections = new Map<string, DatabaseConnection>(); constructor(mcpServer: MCPServer) { this.mcpServer = mcpServer; this.registerStaticTools(); } private registerStaticTools(): void { const registry = this.mcpServer.getRegistry(); registry.registerTool( 'db.start_connection', { name: 'db.start_connection', description: 'Start a new database connection', inputSchema: { type: 'object', properties: { opts: { type: 'object', properties: { driver: { type: 'string', enum: ['postgres', 'mysql', 'sqlite', 'mssql'] }, dsn: { type: 'string' } }, required: ['driver', 'dsn'] } }, required: ['opts'] } }, this.startConnection.bind(this) ); registry.registerTool( 'db.end_connection', { name: 'db.end_connection', description: 'End a database connection', inputSchema: { type: 'object', properties: { connection_id: { type: 'string' } }, required: ['connection_id'] } }, this.endConnection.bind(this) ); } private async startConnection(params: { opts: DatabaseConnectionOptions; }): Promise<StartConnectionResponse> { const connectionId = uuidv4(); const { driver, dsn } = params.opts; let dbConnection: any; try { switch (driver) { case 'postgres': dbConnection = await this.createPostgresConnection(dsn); break; case 'mysql': dbConnection = await this.createMysqlConnection(dsn); break; case 'sqlite': dbConnection = await this.createSqliteConnection(dsn); break; case 'mssql': dbConnection = await this.createMssqlConnection(dsn); break; default: throw new InvalidArgError('driver', `Unsupported driver: ${driver}`); } const connection: DatabaseConnection = { connection_id: connectionId, driver, connection: dbConnection, created_at: new Date().toISOString(), last_used: new Date().toISOString(), tx_state: 'closed' }; this.connections.set(connectionId, connection); // Register this connection with the registry this.mcpServer.getRegistry().createSession(connectionId, 'db'); // Register dynamic connection tools this.registerConnectionTools(connectionId); return { connection_id: connectionId }; } catch (error) { throw new Error(`Failed to connect to database: ${error}`); } } private async createPostgresConnection(dsn: string): Promise<any> { const client = new pg.Client(dsn); await client.connect(); return client; } private async createMysqlConnection(dsn: string): Promise<any> { return await mysql.createConnection(dsn); } private async createSqliteConnection(dsn: string): Promise<any> { return new Promise((resolve, reject) => { const db = new sqlite3.Database(dsn, (err) => { if (err) reject(err); else resolve(db); }); }); } private async createMssqlConnection(dsn: string): Promise<any> { // Parse DSN for SQL Server const url = new URL(dsn); const config: any = { server: url.hostname, authentication: { type: 'default', options: { userName: url.username, password: url.password } }, options: { database: url.pathname.substring(1), port: parseInt(url.port || '1433'), encrypt: true, trustServerCertificate: true } }; return new Promise((resolve, reject) => { const connection = new tedious.Connection(config); connection.on('connect', (err) => { if (err) reject(err); else resolve(connection); }); connection.connect(); }); } private registerConnectionTools(connectionId: string): void { const registry = this.mcpServer.getRegistry(); registry.registerSessionTool( connectionId, `db.${connectionId}.exec`, { name: `db.${connectionId}.exec`, description: 'Execute SQL statement', inputSchema: { type: 'object', properties: { sql: { type: 'string' }, params: { type: 'array' }, tx: { type: 'string', enum: ['none', 'begin', 'commit', 'rollback'] } }, required: ['sql'] } }, (params: any) => this.exec(connectionId, params) ); registry.registerSessionTool( connectionId, `db.${connectionId}.query`, { name: `db.${connectionId}.query`, description: 'Query database and return rows', inputSchema: { type: 'object', properties: { sql: { type: 'string' }, params: { type: 'array' }, row_mode: { type: 'string', enum: ['dict', 'array'] } }, required: ['sql'] } }, (params: any) => this.query(connectionId, params) ); registry.registerSessionTool( connectionId, `db.${connectionId}.schema_introspect`, { name: `db.${connectionId}.schema_introspect`, description: 'Introspect database schema', inputSchema: { type: 'object', properties: {} } }, () => this.schemaIntrospect(connectionId) ); } private async endConnection(params: { connection_id: string; }): Promise<{ ok: true }> { const connection = this.connections.get(params.connection_id); if (!connection) { throw new SessionNotFoundError(params.connection_id); } try { // Close the database connection switch (connection.driver) { case 'postgres': await connection.connection.end(); break; case 'mysql': await connection.connection.end(); break; case 'sqlite': connection.connection.close(); break; case 'mssql': connection.connection.close(); break; } } catch (error) { console.warn(`Error closing database connection ${params.connection_id}:`, error); } this.connections.delete(params.connection_id); this.mcpServer.getRegistry().destroySession(params.connection_id); return { ok: true }; } private async exec(connectionId: string, params: { sql: string; params?: any[]; tx?: TransactionCommand; }): Promise<ExecResponse> { const connection = this.getConnection(connectionId); // Handle transaction commands if (params.tx === 'begin') { await this.beginTransaction(connection); connection.tx_state = 'open'; return { status: 'ok', tx_state: 'open' }; } if (params.tx === 'commit') { await this.commitTransaction(connection); connection.tx_state = 'closed'; return { status: 'ok', tx_state: 'closed' }; } if (params.tx === 'rollback') { await this.rollbackTransaction(connection); connection.tx_state = 'closed'; return { status: 'ok', tx_state: 'closed' }; } // Execute the SQL const result = await this.executeSQL(connection, params.sql, params.params); return { status: 'ok', rows_affected: result.rowsAffected, tx_state: connection.tx_state }; } private async query(connectionId: string, params: { sql: string; params?: any[]; row_mode?: RowMode; }): Promise<QueryResponse> { const connection = this.getConnection(connectionId); const rowMode = params.row_mode || 'dict'; const result = await this.executeQuery(connection, params.sql, params.params); if (rowMode === 'array') { return { cols: result.columns, rows: result.rows.map((row: any) => Object.values(row)) }; } else { return { cols: result.columns, rows: result.rows }; } } private async schemaIntrospect(connectionId: string): Promise<SchemaIntrospectionResponse> { const connection = this.getConnection(connectionId); let schemas: any[] = []; switch (connection.driver) { case 'postgres': schemas = await this.introspectPostgres(connection.connection); break; case 'mysql': schemas = await this.introspectMysql(connection.connection); break; case 'sqlite': schemas = await this.introspectSqlite(connection.connection); break; case 'mssql': schemas = await this.introspectMssql(connection.connection); break; } return { schemas }; } private async introspectPostgres(client: any): Promise<any[]> { const schemasQuery = ` SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ('pg_catalog', 'information_schema') `; const schemasResult = await client.query(schemasQuery); const schemas = []; for (const schemaRow of schemasResult.rows) { const tablesQuery = ` SELECT table_name, column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema = $1 ORDER BY table_name, ordinal_position `; const tablesResult = await client.query(tablesQuery, [schemaRow.schema_name]); const tableMap = new Map(); for (const row of tablesResult.rows) { if (!tableMap.has(row.table_name)) { tableMap.set(row.table_name, { name: row.table_name, columns: [] }); } tableMap.get(row.table_name).columns.push({ name: row.column_name, type: row.data_type, nullable: row.is_nullable === 'YES', default: row.column_default }); } schemas.push({ name: schemaRow.schema_name, tables: Array.from(tableMap.values()) }); } return schemas; } private async introspectMysql(connection: any): Promise<any[]> { const [rows] = await connection.query(` SELECT table_schema, table_name, column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys') ORDER BY table_schema, table_name, ordinal_position `); const schemaMap = new Map(); for (const row of rows) { if (!schemaMap.has(row.table_schema)) { schemaMap.set(row.table_schema, new Map()); } const tableMap = schemaMap.get(row.table_schema); if (!tableMap.has(row.table_name)) { tableMap.set(row.table_name, { name: row.table_name, columns: [] }); } tableMap.get(row.table_name).columns.push({ name: row.column_name, type: row.data_type, nullable: row.is_nullable === 'YES', default: row.column_default }); } const schemas = []; for (const [schemaName, tables] of schemaMap) { schemas.push({ name: schemaName, tables: Array.from(tables.values()) }); } return schemas; } private async introspectSqlite(db: any): Promise<any[]> { return new Promise((resolve) => { const tables: any[] = []; db.all("SELECT name FROM sqlite_master WHERE type='table'", (err: any, tableRows: any[]) => { if (err) { resolve([{ name: 'main', tables: [] }]); return; } let completed = 0; for (const tableRow of tableRows) { db.all(`PRAGMA table_info(${tableRow.name})`, (err: any, columns: any[]) => { tables.push({ name: tableRow.name, columns: columns.map((col: any) => ({ name: col.name, type: col.type, nullable: col.notnull === 0, default: col.dflt_value })) }); completed++; if (completed === tableRows.length) { resolve([{ name: 'main', tables }]); } }); } if (tableRows.length === 0) { resolve([{ name: 'main', tables: [] }]); } }); }); } private async introspectMssql(connection: any): Promise<any[]> { // MSSQL introspection via system tables return [{ name: 'dbo', tables: [] }]; } private async beginTransaction(connection: DatabaseConnection): Promise<void> { switch (connection.driver) { case 'postgres': await connection.connection.query('BEGIN'); break; case 'mysql': await connection.connection.beginTransaction(); break; case 'sqlite': connection.connection.run('BEGIN TRANSACTION'); break; case 'mssql': // MSSQL transaction handling would go here break; } } private async commitTransaction(connection: DatabaseConnection): Promise<void> { switch (connection.driver) { case 'postgres': await connection.connection.query('COMMIT'); break; case 'mysql': await connection.connection.commit(); break; case 'sqlite': connection.connection.run('COMMIT'); break; case 'mssql': // MSSQL transaction handling would go here break; } } private async rollbackTransaction(connection: DatabaseConnection): Promise<void> { switch (connection.driver) { case 'postgres': await connection.connection.query('ROLLBACK'); break; case 'mysql': await connection.connection.rollback(); break; case 'sqlite': connection.connection.run('ROLLBACK'); break; case 'mssql': // MSSQL transaction handling would go here break; } } private async executeSQL(connection: DatabaseConnection, sql: string, params?: any[]): Promise<any> { switch (connection.driver) { case 'postgres': const pgResult = await connection.connection.query(sql, params); return { rowsAffected: pgResult.rowCount }; case 'mysql': const [mysqlResult] = await connection.connection.execute(sql, params); return { rowsAffected: (mysqlResult as any).affectedRows }; case 'sqlite': return new Promise((resolve, reject) => { connection.connection.run(sql, params, function(this: any, err: any) { if (err) reject(err); else resolve({ rowsAffected: this.changes }); }); }); case 'mssql': // MSSQL execution would go here return { rowsAffected: 0 }; default: throw new Error(`Unsupported driver: ${connection.driver}`); } } private async executeQuery(connection: DatabaseConnection, sql: string, params?: any[]): Promise<any> { switch (connection.driver) { case 'postgres': const pgResult = await connection.connection.query(sql, params); return { columns: pgResult.fields.map((f: any) => f.name), rows: pgResult.rows }; case 'mysql': const [rows, fields] = await connection.connection.execute(sql, params); return { columns: fields.map((f: any) => f.name), rows }; case 'sqlite': return new Promise((resolve, reject) => { connection.connection.all(sql, params, (err: any, rows: any[]) => { if (err) reject(err); else { const columns = rows.length > 0 ? Object.keys(rows[0]) : []; resolve({ columns, rows }); } }); }); case 'mssql': // MSSQL query would go here return { columns: [], rows: [] }; default: throw new Error(`Unsupported driver: ${connection.driver}`); } } private getConnection(connectionId: string): DatabaseConnection { const connection = this.connections.get(connectionId); if (!connection) { throw new SessionNotFoundError(connectionId); } connection.last_used = new Date().toISOString(); return connection; } // Cleanup method to be called on server shutdown async cleanup(): Promise<void> { const connectionIds = Array.from(this.connections.keys()); for (const connectionId of connectionIds) { try { await this.endConnection({ connection_id: connectionId }); } catch (error) { console.warn(`Error cleaning up database connection ${connectionId}:`, error); } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JacobFV/mcp-fullstack'

If you have feedback or need assistance with the MCP directory API, please join our Discord server