dbQuery
Execute SQL queries on PostgreSQL databases to retrieve and manage data through database connections.
Instructions
Execute a SQL query on a PostgreSQL database
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| connectionString | Yes | Database connection string | |
| query | Yes | SQL query to execute |
Implementation Reference
- src/tools/dbQuery.ts:34-71 (handler)MCP tool handler for the 'dbQuery' tool. It processes input arguments, reports progress, calls the database service to execute the query, and returns structured content in MCP format.async (args, extra) => { const progress = createProgressReporter(extra, "dbQuery"); const total = 1; progress?.({ progress: 0, total, message: "Dispatching database query" }); const result: DatabaseQueryResult = await executeDatabaseQuery( args.profile, args.query, args.parameters, { timeoutMs: args.timeoutMs, rowLimit: args.rowLimit, requestId: String(extra.requestId), tool: "dbQuery", signal: extra.signal, onProgress: (update) => { progress?.({ progress: update.progress, total, message: update.message }); } } ); const structuredContent: Record<string, unknown> = { rows: result.rows, rowCount: result.rowCount, truncated: result.truncated, durationMs: result.durationMs }; return { content: [], structuredContent }; }
- src/tools/dbQuery.ts:9-24 (schema)Zod schemas defining the input and output shapes for the dbQuery tool.const DbQueryInputSchema = z.object({ profile: z.string().describe("Database profile to use"), query: z.string().min(1).describe("SQL query to execute"), parameters: z.array(z.unknown()).optional().describe("Positional parameters for the query"), timeoutMs: z.number().int().positive().optional().describe("Query timeout in milliseconds"), rowLimit: z.number().int().positive().optional().describe("Override maximum number of rows to return") }); const DbQueryOutputShape = { rows: z.array(z.record(z.string(), z.unknown())), rowCount: z.number().int().nonnegative(), truncated: z.boolean(), durationMs: z.number().int().nonnegative() }; const DbQueryOutputSchema = z.object(DbQueryOutputShape);
- src/tools/dbQuery.ts:26-73 (registration)Registers the 'dbQuery' tool with the MCP server using registerTool, specifying name, schemas, and handler.export function registerDbTool(server: McpServer): void { server.registerTool( "dbQuery", { description: "Execute a SQL query on a PostgreSQL database using a configured profile", inputSchema: DbQueryInputSchema.shape, outputSchema: DbQueryOutputShape }, async (args, extra) => { const progress = createProgressReporter(extra, "dbQuery"); const total = 1; progress?.({ progress: 0, total, message: "Dispatching database query" }); const result: DatabaseQueryResult = await executeDatabaseQuery( args.profile, args.query, args.parameters, { timeoutMs: args.timeoutMs, rowLimit: args.rowLimit, requestId: String(extra.requestId), tool: "dbQuery", signal: extra.signal, onProgress: (update) => { progress?.({ progress: update.progress, total, message: update.message }); } } ); const structuredContent: Record<string, unknown> = { rows: result.rows, rowCount: result.rowCount, truncated: result.truncated, durationMs: result.durationMs }; return { content: [], structuredContent }; } ); }
- Core helper function that performs the actual database query execution against PostgreSQL, including safety validations, concurrency control, timeouts, row limits, abort signals, and progress updates.export async function executeDatabaseQuery( profileName: string, query: string, values: unknown[] | undefined, options: DatabaseQueryOptions = {} ): Promise<DatabaseQueryResult> { const config = getConfig(); const profile = config.databaseProfiles[profileName]; if (!profile) { throw new Error(`Database profile '${profileName}' not found`); } validateQuerySafety(query); ensureQueryAllowed(query, profile.allowedStatementPatterns); const timeoutMs = Math.min(options.timeoutMs ?? profile.maxExecutionMs, profile.maxExecutionMs); const rowLimit = Math.min(options.rowLimit ?? profile.maxRows, profile.maxRows); options.onProgress?.({ progress: 0, message: "Waiting for database availability" }); const release = await dbConcurrency.acquire(profileName, profile.maxConcurrent, options.signal); const client = new Client({ connectionString: profile.connectionString }); const start = Date.now(); const abortError = createAbortError("Database query cancelled"); let aborted = false; let abortListener: (() => void) | undefined; logger.info("Executing database query", { profile: profileName, requestId: options.requestId, tool: options.tool }); try { if (options.signal) { if (options.signal.aborted) { aborted = true; throw abortError; } abortListener = () => { aborted = true; options.onProgress?.({ progress: 1, message: "Database query cancelled" }); if (client.connection?.stream) { client.connection.stream.destroy(abortError); } }; options.signal.addEventListener("abort", abortListener, { once: true }); } options.onProgress?.({ progress: 0.1, message: "Connecting to database" }); await client.connect(); if (aborted) { throw abortError; } options.onProgress?.({ progress: 0.3, message: "Applying session limits" }); await applyTimeout(client, timeoutMs); options.onProgress?.({ progress: 0.6, message: "Executing query" }); const result: QueryResult = await client.query({ text: query, values }); if (aborted) { throw abortError; } const rows = result.rows.slice(0, rowLimit); const truncated = result.rows.length > rows.length; const rowCount = result.rowCount ?? rows.length; const durationMs = Date.now() - start; logger.info("Database query completed", { profile: profileName, requestId: options.requestId, tool: options.tool, rowCount, truncated, durationMs }); options.onProgress?.({ progress: 1, message: "Database query completed" }); return { rows, rowCount, truncated, durationMs }; } catch (error) { if (aborted) { throw abortError; } logger.error("Database query failed", { profile: profileName, requestId: options.requestId, tool: options.tool, error: error instanceof Error ? error.message : String(error) }); throw error; } finally { if (options.signal && abortListener) { options.signal.removeEventListener("abort", abortListener); } try { await client.end(); } catch (endError) { logger.debug("Error closing database client", { profile: profileName, error: endError instanceof Error ? endError.message : String(endError) }); } release(); } }
- src/tools/index.ts:3-8 (registration)Top-level tool registration entry point that calls registerDbTool to register the dbQuery tool.import { registerDbTool } from "./dbQuery.js"; import { registerTrainingTool } from "./trainClassifier.js"; export function registerTools(server: McpServer): void { registerSshTool(server); registerDbTool(server);