dbQuery
Execute SQL queries on PostgreSQL databases using configured profiles to retrieve or modify data through the Infer MCP Server.
Instructions
Execute a SQL query on a PostgreSQL database using a configured profile
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| profile | Yes | Database profile to use | |
| query | Yes | SQL query to execute | |
| parameters | No | Positional parameters for the query | |
| timeoutMs | No | Query timeout in milliseconds | |
| rowLimit | No | Override maximum number of rows to return |
Implementation Reference
- src/tools/dbQuery.ts:34-72 (handler)The handler function for the 'dbQuery' MCP tool. It creates a progress reporter, executes the database query via executeDatabaseQuery service, structures the output, and returns it in the expected MCP format.async (args, extra) => { const progress = createProgressReporter(extra, "dbQuery"); const total = 1; progress?.({ progress: 0, total, message: "Dispatching database query" }); const result: DatabaseQueryResult = await executeDatabaseQuery( args.profile, args.query, args.parameters, { timeoutMs: args.timeoutMs, rowLimit: args.rowLimit, requestId: String(extra.requestId), tool: "dbQuery", signal: extra.signal, onProgress: (update) => { progress?.({ progress: update.progress, total, message: update.message }); } } ); const structuredContent: Record<string, unknown> = { rows: result.rows, rowCount: result.rowCount, truncated: result.truncated, durationMs: result.durationMs }; return { content: [], structuredContent }; } );
- src/tools/dbQuery.ts:9-24 (schema)Zod schemas defining the input (DbQueryInputSchema) and output (DbQueryOutputShape and Schema) for the dbQuery tool.const DbQueryInputSchema = z.object({ profile: z.string().describe("Database profile to use"), query: z.string().min(1).describe("SQL query to execute"), parameters: z.array(z.unknown()).optional().describe("Positional parameters for the query"), timeoutMs: z.number().int().positive().optional().describe("Query timeout in milliseconds"), rowLimit: z.number().int().positive().optional().describe("Override maximum number of rows to return") }); const DbQueryOutputShape = { rows: z.array(z.record(z.string(), z.unknown())), rowCount: z.number().int().nonnegative(), truncated: z.boolean(), durationMs: z.number().int().nonnegative() }; const DbQueryOutputSchema = z.object(DbQueryOutputShape);
- src/tools/dbQuery.ts:26-73 (registration)The registerDbTool function that registers the 'dbQuery' tool with the MCP server, specifying name, description, schemas, and handler.export function registerDbTool(server: McpServer): void { server.registerTool( "dbQuery", { description: "Execute a SQL query on a PostgreSQL database using a configured profile", inputSchema: DbQueryInputSchema.shape, outputSchema: DbQueryOutputShape }, async (args, extra) => { const progress = createProgressReporter(extra, "dbQuery"); const total = 1; progress?.({ progress: 0, total, message: "Dispatching database query" }); const result: DatabaseQueryResult = await executeDatabaseQuery( args.profile, args.query, args.parameters, { timeoutMs: args.timeoutMs, rowLimit: args.rowLimit, requestId: String(extra.requestId), tool: "dbQuery", signal: extra.signal, onProgress: (update) => { progress?.({ progress: update.progress, total, message: update.message }); } } ); const structuredContent: Record<string, unknown> = { rows: result.rows, rowCount: result.rowCount, truncated: result.truncated, durationMs: result.durationMs }; return { content: [], structuredContent }; } ); }
- src/tools/index.ts:6-10 (registration)Top-level registration function that calls registerDbTool to include dbQuery among all tools.export function registerTools(server: McpServer): void { registerSshTool(server); registerDbTool(server); registerTrainingTool(server); }
- Core helper function that executes the SQL query on PostgreSQL with safety validations, concurrency limiting, progress reporting, timeouts, row limits, and proper error handling.export async function executeDatabaseQuery( profileName: string, query: string, values: unknown[] | undefined, options: DatabaseQueryOptions = {} ): Promise<DatabaseQueryResult> { const config = getConfig(); const profile = config.databaseProfiles[profileName]; if (!profile) { throw new Error(`Database profile '${profileName}' not found`); } validateQuerySafety(query); ensureQueryAllowed(query, profile.allowedStatementPatterns); const timeoutMs = Math.min(options.timeoutMs ?? profile.maxExecutionMs, profile.maxExecutionMs); const rowLimit = Math.min(options.rowLimit ?? profile.maxRows, profile.maxRows); options.onProgress?.({ progress: 0, message: "Waiting for database availability" }); const release = await dbConcurrency.acquire(profileName, profile.maxConcurrent, options.signal); const client = new Client({ connectionString: profile.connectionString }); const start = Date.now(); const abortError = createAbortError("Database query cancelled"); let aborted = false; let abortListener: (() => void) | undefined; logger.info("Executing database query", { profile: profileName, requestId: options.requestId, tool: options.tool }); try { if (options.signal) { if (options.signal.aborted) { aborted = true; throw abortError; } abortListener = () => { aborted = true; options.onProgress?.({ progress: 1, message: "Database query cancelled" }); if (client.connection?.stream) { client.connection.stream.destroy(abortError); } }; options.signal.addEventListener("abort", abortListener, { once: true }); } options.onProgress?.({ progress: 0.1, message: "Connecting to database" }); await client.connect(); if (aborted) { throw abortError; } options.onProgress?.({ progress: 0.3, message: "Applying session limits" }); await applyTimeout(client, timeoutMs); options.onProgress?.({ progress: 0.6, message: "Executing query" }); const result: QueryResult = await client.query({ text: query, values }); if (aborted) { throw abortError; } const rows = result.rows.slice(0, rowLimit); const truncated = result.rows.length > rows.length; const rowCount = result.rowCount ?? rows.length; const durationMs = Date.now() - start; logger.info("Database query completed", { profile: profileName, requestId: options.requestId, tool: options.tool, rowCount, truncated, durationMs }); options.onProgress?.({ progress: 1, message: "Database query completed" }); return { rows, rowCount, truncated, durationMs }; } catch (error) { if (aborted) { throw abortError; } logger.error("Database query failed", { profile: profileName, requestId: options.requestId, tool: options.tool, error: error instanceof Error ? error.message : String(error) }); throw error; } finally { if (options.signal && abortListener) { options.signal.removeEventListener("abort", abortListener); } try { await client.end(); } catch (endError) { logger.debug("Error closing database client", { profile: profileName, error: endError instanceof Error ? endError.message : String(endError) }); } release(); } }