Skip to main content
Glama

QuestDB MCP Server

by brunoprela
questdb-client.ts7.06 kB
import { Sender } from "@questdb/nodejs-client"; import { QuestDBConfig, QueryResult } from "./types.js"; import { Logger } from "./logger.js"; /** * QuestDB client wrapper that manages connections and provides * high-level methods for querying and inserting data */ export class QuestDBClient { private sender: Sender | null = null; private config: QuestDBConfig; private logger: Logger; constructor(config: QuestDBConfig, logger: Logger) { this.config = config; this.logger = logger; } /** * Get or create the QuestDB sender instance */ private async getSender(): Promise<Sender> { if (!this.sender) { const configString = this.buildConfigString(); this.sender = await Sender.fromConfig(configString); // Log initialization (may fail if server not connected yet, that's OK) this.logger.info("QuestDB sender initialized", { host: this.config.host, port: this.config.port, }).catch(() => { // Ignore logging errors during initialization }); } return this.sender; } /** * Build QuestDB client configuration string */ private buildConfigString(): string { let config = `http::addr=${this.config.host}:${this.config.port}`; if (this.config.username && this.config.password) { config += `;username=${this.config.username};password=${this.config.password}`; } if (this.config.autoFlushRows) { config += `;auto_flush_rows=${this.config.autoFlushRows}`; } if (this.config.autoFlushInterval) { config += `;auto_flush_interval=${this.config.autoFlushInterval}`; } return config; } /** * Execute a SELECT query on QuestDB * @param query - SQL SELECT query * @param format - Output format (json or csv) * @returns Query result (QueryResult for json, string for csv) */ async query(query: string, format: string = "json"): Promise<QueryResult | string> { // Basic safety check - only allow SELECT queries const trimmedQuery = query.trim().toUpperCase(); if (!trimmedQuery.startsWith("SELECT")) { throw new Error("Only SELECT queries are allowed for safety reasons"); } const url = new URL(`http://${this.config.host}:${this.config.port}/exec`); url.searchParams.append("query", query); url.searchParams.append("fmt", format); const authHeader = this.config.username && this.config.password ? `Basic ${Buffer.from(`${this.config.username}:${this.config.password}`).toString("base64")}` : undefined; const response = await fetch(url.toString(), { method: "GET", headers: authHeader ? { Authorization: authHeader } : {}, }); if (!response.ok) { const errorText = await response.text(); throw new Error(`Query failed: ${response.status} ${response.statusText}. ${errorText}`); } if (format === "json") { const result = await response.json(); return result as QueryResult; } else { return await response.text(); } } /** * Insert data into a QuestDB table * @param table - Table name * @param data - Data to insert (object with column names as keys) */ async insert(table: string, data: Record<string, any>): Promise<void> { const sender = await this.getSender(); // Extract timestamp if provided const timestamp = data.timestamp !== undefined ? (typeof data.timestamp === "number" ? data.timestamp : Date.parse(data.timestamp)) : undefined; // Remove timestamp from data so it's not added as a column const { timestamp: _, ...columnData } = data; // Start building the row const rowBuilder = sender.table(table); // Add all columns for (const [key, value] of Object.entries(columnData)) { if (value === null || value === undefined) { continue; } const valueType = typeof value; if (valueType === "string") { // Try to determine if it's a symbol or regular string // For simplicity, treat all strings as symbols for now // In a real implementation, you might want to check table schema rowBuilder.symbol(key, value); } else if (valueType === "number") { // Check if it's an integer or float if (Number.isInteger(value)) { rowBuilder.intColumn(key, value); } else { rowBuilder.floatColumn(key, value); } } else if (valueType === "boolean") { rowBuilder.booleanColumn(key, value); } else { // Convert to string for other types rowBuilder.symbol(key, String(value)); } } // Close the row with timestamp (or use current time) if (timestamp !== undefined) { await rowBuilder.at(timestamp, "ms"); } else { await rowBuilder.atNow(); } await sender.flush(); } /** * List all tables in the database * @returns Array of table names */ async listTables(): Promise<string[]> { const query = "SELECT name FROM tables() ORDER BY name"; const result = await this.query(query, "json") as QueryResult; // Parse the result and format it nicely if (result && result.dataset && Array.isArray(result.dataset)) { return result.dataset.map((row: any) => row[0]); } return []; } /** * Get the schema of a specific table * @param table - Table name * @returns Table schema information */ async describeTable(table: string): Promise<QueryResult> { // Sanitize table name to prevent SQL injection if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(table)) { throw new Error(`Invalid table name: ${table}`); } const query = `SELECT * FROM table_columns('${table}')`; return await this.query(query, "json") as QueryResult; } /** * Close the QuestDB connection and cleanup resources */ async close(): Promise<void> { if (this.sender) { try { await this.sender.flush(); await this.sender.close(); // Log cleanup (may fail if server disconnected, that's OK) this.logger.info("QuestDB connection closed").catch(() => { // Ignore logging errors during cleanup }); } catch (error) { console.error("Error closing QuestDB connection:", error); } this.sender = null; } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brunoprela/questdb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server