Skip to main content
Glama
index.js14.3 kB
#!/usr/bin/env node import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { z } from "zod"; import { program } from "commander"; import express from "express"; // Import config import { validateEnvironment } from "./config/env.js"; // Import utilities import { configureLogger } from "./utils/loggerConfig.js"; // Import resource handlers import { listOrganizations } from "./handlers/organizationsHandler.js"; import { listBuckets } from "./handlers/bucketsHandler.js"; import { bucketMeasurements } from "./handlers/measurementsHandler.js"; import { executeQuery } from "./handlers/queryHandler.js"; // Import tool handlers import { writeData } from "./handlers/writeDataTool.js"; import { queryData } from "./handlers/queryDataTool.js"; import { createBucket } from "./handlers/createBucketTool.js"; import { createOrg } from "./handlers/createOrgTool.js"; // Import prompt handlers import { fluxQueryExamplesPrompt } from "./prompts/fluxQueryExamplesPrompt.js"; import { lineProtocolGuidePrompt } from "./prompts/lineProtocolGuidePrompt.js"; // Configure logger and validate environment configureLogger(); validateEnvironment(); // Parse command-line arguments program .option("--http [port]", "Start server with Streamable HTTP transport on specified port (default: 3000)") .option("--stdio", "Force stdio transport (default behavior)") .parse(process.argv); const options = program.opts(); if (options.http !== undefined && options.stdio) { console.error("Cannot use --http and --stdio at the same time. Please choose one transport."); process.exit(1); } // Function to create and configure a new MCP server instance const createMcpServer = () => { const server = new McpServer({ name: "InfluxDB", version: "0.1.1", }); // Register resources server.resource("orgs", "influxdb://orgs", listOrganizations); server.resource("buckets", "influxdb://buckets", listBuckets); server.resource( "bucket-measurements", new ResourceTemplate("influxdb://bucket/{bucketName}/measurements", { list: undefined, }), bucketMeasurements, ); server.resource( "query", new ResourceTemplate("influxdb://query/{orgName}/{fluxQuery}", { list: undefined, }), executeQuery, ); // Register tools server.tool( "write-data", "Stream newline-delimited line protocol records into a bucket. Use this after composing measurements so the LLM can insert real telemetry, optionally controlling timestamp precision.", { org: z .string() .describe( "Human-readable organization name that owns the destination bucket (the same value returned by the orgs resource).", ), bucket: z .string() .describe( "Bucket name to receive the points. Make sure it already exists or call create-bucket first.", ), data: z .string() .describe( "Payload containing one or more line protocol lines (measurements, tags, fields, timestamps) separated by newlines.", ), precision: z .enum(["ns", "us", "ms", "s"]) .optional() .describe( "Optional timestamp precision. Provide it only when the line protocol omits unit suffix context; defaults to nanoseconds.", ), }, writeData, ); server.tool( "query-data", "Execute a Flux query inside an organization to inspect measurement schemas, run aggregations, or validate recently written data.", { org: z .string() .describe( "Organization whose buckets the query should target (exact name, not ID).", ), query: z .string() .describe( "Flux query text. Multi-line strings are supported; results are returned as annotated CSV for easy parsing.", ), }, queryData, ); server.tool( "create-bucket", "Provision a new bucket under an organization so that subsequent write-data calls have a destination.", { name: z .string() .describe( "Friendly bucket name. Follow InfluxDB naming rules (alphanumeric, dashes, underscores).", ), orgID: z .string() .describe( "Organization ID (UUID) that will own the bucket. Retrieve it from the organizations resource or create-org output.", ), retentionPeriodSeconds: z .number() .optional() .describe( "Optional retention duration expressed in seconds. Omit for infinite retention.", ), }, createBucket, ); server.tool( "create-org", "Create a brand-new organization to isolate users or projects before generating buckets and tokens.", { name: z .string() .describe( "Display name for the organization as it should appear in InfluxDB Cloud/OSS.", ), description: z .string() .optional() .describe( "Optional free-form description that helps humans understand why the org exists.", ), }, createOrg, ); // Register prompts server.prompt("flux-query-examples", {}, fluxQueryExamplesPrompt); server.prompt("line-protocol-guide", {}, lineProtocolGuidePrompt); return server; }; // Create MCP server for stdio or as a template for HTTP const globalServer = createMcpServer(); // Add a global error handler process.on("unhandledRejection", (reason, promise) => { console.error("Unhandled Rejection at:", promise, "reason:", reason); // Don't exit - just log the error, as this could be caught and handled elsewhere }); // Enhanced MCP protocol debugging const originalConsoleLog = console.log; const originalConsoleError = console.error; // Create special debugging functions for MCP protocol function logMcpDebug(...args) { originalConsoleLog("[MCP-DEBUG]", ...args); } function logMcpError(...args) { originalConsoleError("[MCP-ERROR]", ...args); } // Enable extra protocol tracing for all requests/responses // This debugging for globalServer.server is primarily for Stdio mode or if a global server instance were used. // For HTTP mode, per-request server instances are created. if (globalServer.server && !options.http) { // Only apply this if not in HTTP mode, or adjust as needed const originalOnMessage = globalServer.server.onmessage; globalServer.server.onmessage = function (message) { logMcpDebug("SERVER RECEIVED MESSAGE:", JSON.stringify(message)); if (originalOnMessage) { return originalOnMessage.call(this, message); } }; // Log server responses const originalSendResponse = globalServer.server._sendResponse; if (originalSendResponse) { globalServer.server._sendResponse = function (id, result) { logMcpDebug("SERVER SENDING RESPONSE:", JSON.stringify({ id, result })); return originalSendResponse.call(this, id, result); }; } // Log server errors const originalSendError = globalServer.server._sendError; if (originalSendError) { globalServer.server._sendError = function (id, error) { logMcpDebug("SERVER SENDING ERROR:", JSON.stringify({ id, error })); return originalSendError.call(this, id, error); }; } } // The rest of the debugging and connection logic will be handled differently // for StdioServerTransport vs StreamableHTTPServerTransport. // This will be addressed in the next step when setting up the Express server. const useHttpTransport = options.http !== undefined; if (!useHttpTransport) { // Start the server with stdio transport console.log("Starting MCP server with stdio transport..."); const stdioTransport = new StdioServerTransport(); // Add extra debugging to the stdioTransport if (stdioTransport._send) { const originalSend = stdioTransport._send; stdioTransport._send = function (data) { logMcpDebug("STDIO SENDING:", JSON.stringify(data)); return originalSend.call(this, data); }; } if (stdioTransport._receive) { const originalReceive = stdioTransport._receive; stdioTransport._receive = function (data) { logMcpDebug("STDIO RECEIVED:", JSON.stringify(data)); return originalReceive.call(this, data); }; } const originalStdioOnMessageCallback = stdioTransport.onmessage; stdioTransport.onmessage = function (message) { logMcpDebug("MESSAGE RECEIVED VIA STDIO:", JSON.stringify(message)); if (originalStdioOnMessageCallback) { return originalStdioOnMessageCallback.call(this, message); } }; // Check if we're in test mode const isTestMode = process.env.MCP_TEST_MODE === "true"; if (isTestMode) { console.log("Running in test mode with enhanced protocol debugging for STDIO"); // Add debugging for server methods const originalConnect = globalServer.connect; globalServer.connect = async function (transportInstance) { logMcpDebug("GlobalServer.connect() called with stdio transport"); try { const result = await originalConnect.call(this, transportInstance); logMcpDebug("GlobalServer.connect() with stdio succeeded"); return result; } catch (err) { logMcpError("GlobalServer.connect() with stdio failed:", err); throw err; } }; } // Create a function to handle connection for stdio const connectStdioServer = async () => { try { console.log("Connecting global server to stdio transport..."); await globalServer.connect(stdioTransport); // Use stdioTransport here console.log("Global server successfully connected to stdio transport"); if (isTestMode) { if (!global.mcpHeartbeatInterval) { global.mcpHeartbeatInterval = setInterval(() => { if (!global.testCleanupInProgress) { console.log("[Heartbeat] MCP server (stdio) is still running..."); } }, 3000); process.on("exit", () => { if (global.mcpHeartbeatInterval) { clearInterval(global.mcpHeartbeatInterval); global.mcpHeartbeatInterval = null; } }); } if (globalServer.server) { globalServer.server.onclose = () => { logMcpError("STDIO SERVER CONNECTION CLOSED"); if (global.mcpHeartbeatInterval) { clearInterval(global.mcpHeartbeatInterval); global.mcpHeartbeatInterval = null; } }; globalServer.server.onerror = (err) => { logMcpError("STDIO SERVER ERROR:", err); }; } } } catch (err) { console.error("Error starting MCP server with stdio:", err); process.exit(1); } }; setTimeout(() => { connectStdioServer(); }, 200); } else { // Start the server with Streamable HTTP transport const app = express(); app.use(express.json()); const port = typeof options.http === 'string' ? parseInt(options.http, 10) : 3000; app.post('/mcp', async (req, res) => { // In stateless mode, create a new instance of transport and server for each request // to ensure complete isolation. logMcpDebug("HTTP POST /mcp received, creating new server and transport."); let server; let transport; try { server = createMcpServer(); transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined, // Stateless }); // Attach logger to the specific transport instance if (transport._send) { const originalSend = transport._send; transport._send = function (data) { logMcpDebug("HTTP SENDING:", JSON.stringify(data)); return originalSend.call(this, data); }; } if (transport._receive) { const originalReceive = transport._receive; transport._receive = function (data) { logMcpDebug("HTTP RECEIVED:", JSON.stringify(data)); return originalReceive.call(this, data); }; } const originalOnMessageCallback = transport.onmessage; transport.onmessage = function (message) { logMcpDebug("HTTP MESSAGE RECEIVED:", JSON.stringify(message)); if (originalOnMessageCallback) { return originalOnMessageCallback.call(this, message); } }; res.on('close', () => { logMcpDebug('HTTP POST /mcp request closed, cleaning up server and transport.'); if (transport) transport.close(); if (server) server.close(); }); await server.connect(transport); await transport.handleRequest(req, res, req.body); } catch (error) { logMcpError('Error handling MCP HTTP request:', error); if (server) server.close(); // Ensure server is closed on error if (transport) transport.close(); // Ensure transport is closed on error if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: req.body?.id || null, }); } } }); app.get('/mcp', async (req, res) => { logMcpDebug('Received GET /mcp request'); res.writeHead(405).end(JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Method not allowed for stateless transport." }, id: null })); }); app.delete('/mcp', async (req, res) => { logMcpDebug('Received DELETE /mcp request'); res.writeHead(405).end(JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Method not allowed for stateless transport." }, id: null })); }); const httpServer = app.listen(port, () => { console.log(`MCP Streamable HTTP Server listening on port ${port}`); }); httpServer.on('error', (err) => { if (err.code === 'EADDRINUSE') { console.error(`Error: Port ${port} is already in use. Please choose a different port or free up port ${port}.`); process.exit(1); } else { console.error('Failed to start HTTP server:', err); process.exit(1); } }); }

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/idoru/influxdb-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server