Skip to main content
Glama
m4tyn0

InfluxDB MCP Server

by m4tyn0
server.py7.49 kB
import asyncio import logging import sys from mcp.server import Server from mcp.server.stdio import stdio_server import mcp.types as types from .auth import validate_token, AuthError from .influx_client import InfluxClient # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stderr # Send logs to stderr ) logger = logging.getLogger("influxdb-mcp-server") # Initialize server and clients app = Server("influxdb-mcp-server") influx_client = InfluxClient() # Store authenticated state authenticated_clients = set() @app.list_tools() async def list_tools() -> list[types.Tool]: logger.info("Listing available tools") return [ types.Tool( name="auth", description="Authenticate with the InfluxDB server using a JWT token", inputSchema={ "type": "object", "properties": { "token": {"type": "string", "description": "JWT token for authentication"} }, "required": ["token"] } ), types.Tool( name="list_databases", description="List all available InfluxDB databases", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="list_measurements", description="List all measurements in a specific database", inputSchema={ "type": "object", "properties": { "database": {"type": "string", "description": "The database name"} }, "required": ["database"] } ), types.Tool( name="query", description="Execute a read-only InfluxQL query against a specific database", inputSchema={ "type": "object", "properties": { "database": {"type": "string", "description": "The database name"}, "query": {"type": "string", "description": "The InfluxQL query (must be a SELECT query)"} }, "required": ["database", "query"] } ) ] @app.call_tool() async def call_tool(name: str, arguments: dict): client_id = id(app.request_context.session) # Handle authentication if name == "auth": token = arguments.get("token") if not token: return types.CallToolResult( isError=True, content=[types.TextContent(type="text", text="No token provided")] ) try: payload = validate_token(token) authenticated_clients.add(client_id) username = payload.get("sub", "unknown") logger.info(f"Client {client_id} authenticated as {username}") return types.CallToolResult( content=[types.TextContent(type="text", text=f"Authentication successful as {username}")] ) except AuthError as e: logger.warning(f"Authentication failed: {str(e)}") return types.CallToolResult( isError=True, content=[types.TextContent(type="text", text=f"Authentication failed: {str(e)}")] ) # For all other tools, require authentication if client_id not in authenticated_clients: logger.warning(f"Unauthenticated client {client_id} attempted to use {name}") return types.CallToolResult( isError=True, content=[types.TextContent(type="text", text="Authentication required. Use the 'auth' tool with a valid JWT token first.")] ) # Handle database operations try: if name == "list_databases": databases = influx_client.list_databases() return types.CallToolResult( content=[types.TextContent(type="text", text=f"Available databases: {', '.join([db['name'] for db in databases])}")] ) elif name == "list_measurements": database = arguments.get("database") if not database: return types.CallToolResult( isError=True, content=[types.TextContent(type="text", text="Database name is required")] ) measurements = influx_client.list_measurements(database) measurement_names = [m['name'] for m in measurements] return types.CallToolResult( content=[types.TextContent(type="text", text=f"Measurements in '{database}': {', '.join(measurement_names)}")] ) elif name == "query": database = arguments.get("database") query = arguments.get("query") if not database or not query: return types.CallToolResult( isError=True, content=[types.TextContent(type="text", text="Both database and query are required")] ) result = influx_client.execute_query(database, query) # Format the result as a readable string formatted_result = f"Query results from '{database}':\n" # Format the series data if "series" in result["results"][0]: for series in result["results"][0]["series"]: formatted_result += f"\nMeasurement: {series.get('name', 'unknown')}\n" # Column headers columns = series.get("columns", []) formatted_result += "| " + " | ".join(columns) + " |\n" formatted_result += "|" + "---|" * len(columns) + "\n" # Data rows for values in series.get("values", []): formatted_result += "| " + " | ".join(str(v) for v in values) + " |\n" else: formatted_result += "No data found for this query." return types.CallToolResult( content=[types.TextContent(type="text", text=formatted_result)] ) else: logger.warning(f"Unknown tool requested: {name}") return types.CallToolResult( isError=True, content=[types.TextContent(type="text", text=f"Unknown tool: {name}")] ) except Exception as e: logger.error(f"Error executing tool {name}: {str(e)}") return types.CallToolResult( isError=True, content=[types.TextContent(type="text", text=f"Error: {str(e)}")] ) async def main(): logger.info("Starting InfluxDB MCP Server") # Check if we can connect to InfluxDB try: influx_client.list_databases() logger.info("Successfully connected to InfluxDB") except Exception as e: logger.error(f"Failed to connect to InfluxDB: {str(e)}") sys.exit(1) # Start the MCP server async with stdio_server() as streams: logger.info("MCP Server started successfully") await app.run( streams[0], streams[1], app.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/m4tyn0/influx_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server