Skip to main content
Glama

InfluxDB MCP Server

by idoru
index.js7.72 kB
#!/usr/bin/env node import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { z } from "zod"; // Import config import { validateEnvironment } from "./config/env.js"; // Import utilities import { configureLogger } from "./utils/loggerConfig.js"; // Import resource handlers import { listOrganizations } from "./handlers/organizationsHandler.js"; import { listBuckets } from "./handlers/bucketsHandler.js"; import { bucketMeasurements } from "./handlers/measurementsHandler.js"; import { executeQuery } from "./handlers/queryHandler.js"; // Import tool handlers import { writeData } from "./handlers/writeDataTool.js"; import { queryData } from "./handlers/queryDataTool.js"; import { createBucket } from "./handlers/createBucketTool.js"; import { createOrg } from "./handlers/createOrgTool.js"; // Import prompt handlers import { fluxQueryExamplesPrompt } from "./prompts/fluxQueryExamplesPrompt.js"; import { lineProtocolGuidePrompt } from "./prompts/lineProtocolGuidePrompt.js"; // Configure logger and validate environment configureLogger(); validateEnvironment(); // Create MCP server const server = new McpServer({ name: "InfluxDB", version: "0.1.1", }); // Register resources server.resource("orgs", "influxdb://orgs", listOrganizations); server.resource("buckets", "influxdb://buckets", listBuckets); server.resource( "bucket-measurements", new ResourceTemplate("influxdb://bucket/{bucketName}/measurements", { list: undefined, }), bucketMeasurements, ); server.resource( "query", new ResourceTemplate("influxdb://query/{orgName}/{fluxQuery}", { list: undefined, }), executeQuery, ); // Register tools server.tool( "write-data", { org: z.string().describe("The organization name"), bucket: z.string().describe("The bucket name"), data: z.string().describe("Data in InfluxDB line protocol format"), precision: z.enum(["ns", "us", "ms", "s"]).optional().describe( "Timestamp precision (ns, us, ms, s)", ), }, writeData, ); server.tool( "query-data", { org: z.string().describe("The organization name"), query: z.string().describe("Flux query string"), }, queryData, ); server.tool( "create-bucket", { name: z.string().describe("The bucket name"), orgID: z.string().describe("The organization ID"), retentionPeriodSeconds: z.number().optional().describe( "Retention period in seconds (optional)", ), }, createBucket, ); server.tool( "create-org", { name: z.string().describe("The organization name"), description: z.string().optional().describe( "Organization description (optional)", ), }, createOrg, ); // Register prompts server.prompt("flux-query-examples", {}, fluxQueryExamplesPrompt); server.prompt("line-protocol-guide", {}, lineProtocolGuidePrompt); // Add a global error handler process.on("unhandledRejection", (reason, promise) => { console.error("Unhandled Rejection at:", promise, "reason:", reason); // Don't exit - just log the error, as this could be caught and handled elsewhere }); // Enhanced MCP protocol debugging const originalConsoleLog = console.log; const originalConsoleError = console.error; // Create special debugging functions for MCP protocol function logMcpDebug(...args) { originalConsoleLog("[MCP-DEBUG]", ...args); } function logMcpError(...args) { originalConsoleError("[MCP-ERROR]", ...args); } // Start the server with stdio transport console.log("Starting MCP server with stdio transport..."); const transport = new StdioServerTransport(); // Add extra debugging to the transport if (transport._send) { const originalSend = transport._send; transport._send = function (data) { logMcpDebug("SENDING:", JSON.stringify(data)); return originalSend.call(this, data); }; } // If the transport has a _receive method, wrap it for debugging if (transport._receive) { const originalReceive = transport._receive; transport._receive = function (data) { logMcpDebug("RECEIVED:", JSON.stringify(data)); return originalReceive.call(this, data); }; } // Enable extra protocol tracing for all requests/responses if (server.server) { const originalOnMessage = server.server.onmessage; server.server.onmessage = function (message) { logMcpDebug("SERVER RECEIVED MESSAGE:", JSON.stringify(message)); if (originalOnMessage) { return originalOnMessage.call(this, message); } }; // Log server responses const originalSendResponse = server.server._sendResponse; if (originalSendResponse) { server.server._sendResponse = function (id, result) { logMcpDebug("SERVER SENDING RESPONSE:", JSON.stringify({ id, result })); return originalSendResponse.call(this, id, result); }; } // Log server errors const originalSendError = server.server._sendError; if (originalSendError) { server.server._sendError = function (id, error) { logMcpDebug("SERVER SENDING ERROR:", JSON.stringify({ id, error })); return originalSendError.call(this, id, error); }; } } // Override transport callbacks for debugging const originalOnMessage = transport.onmessage; transport.onmessage = function (message) { logMcpDebug("MESSAGE RECEIVED:", JSON.stringify(message)); if (originalOnMessage) { return originalOnMessage.call(this, message); } }; // Check if we're in test mode const isTestMode = process.env.MCP_TEST_MODE === "true"; if (isTestMode) { console.log("Running in test mode with enhanced protocol debugging"); // Add debugging for server methods const originalConnect = server.connect; server.connect = async function (transport) { logMcpDebug("Server.connect() called"); try { const result = await originalConnect.call(this, transport); logMcpDebug("Server.connect() succeeded"); return result; } catch (err) { logMcpError("Server.connect() failed:", err); throw err; } }; } // Create a function to handle connection const connectServer = async () => { try { console.log("Connecting server to transport..."); await server.connect(transport); console.log("Server successfully connected to transport"); // In test mode, perform extra validation if (isTestMode) { // Track heartbeat interval in global so it can be cleaned up if (!global.mcpHeartbeatInterval) { // Add a heartbeat timer to keep the connection alive during tests global.mcpHeartbeatInterval = setInterval(() => { // Only log if we're not in cleanup mode if (!global.testCleanupInProgress) { console.log("[Heartbeat] MCP server is still running..."); } }, 3000); // Clean up the interval if the process exits process.on("exit", () => { if (global.mcpHeartbeatInterval) { clearInterval(global.mcpHeartbeatInterval); global.mcpHeartbeatInterval = null; } }); } // Add extra hooks for protocol debugging server.server.onclose = () => { logMcpError("SERVER CONNECTION CLOSED"); // Clear interval on connection close if (global.mcpHeartbeatInterval) { clearInterval(global.mcpHeartbeatInterval); global.mcpHeartbeatInterval = null; } }; server.server.onerror = (err) => { logMcpError("SERVER ERROR:", err); }; } } catch (err) { console.error("Error starting MCP server:", err); process.exit(1); } }; // Connect with a small delay to ensure proper initialization setTimeout(() => { connectServer(); }, 200);

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/idoru/influxdb-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server