import asyncio
import logging
import sys
from mcp.server import Server
from mcp.server.stdio import stdio_server
import mcp.types as types
from .auth import validate_token, AuthError
from .influx_client import InfluxClient
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr # Send logs to stderr
)
logger = logging.getLogger("influxdb-mcp-server")
# Initialize server and clients
app = Server("influxdb-mcp-server")
influx_client = InfluxClient()
# Store authenticated state
authenticated_clients = set()
@app.list_tools()
async def list_tools() -> list[types.Tool]:
logger.info("Listing available tools")
return [
types.Tool(
name="auth",
description="Authenticate with the InfluxDB server using a JWT token",
inputSchema={
"type": "object",
"properties": {
"token": {"type": "string", "description": "JWT token for authentication"}
},
"required": ["token"]
}
),
types.Tool(
name="list_databases",
description="List all available InfluxDB databases",
inputSchema={
"type": "object",
"properties": {}
}
),
types.Tool(
name="list_measurements",
description="List all measurements in a specific database",
inputSchema={
"type": "object",
"properties": {
"database": {"type": "string", "description": "The database name"}
},
"required": ["database"]
}
),
types.Tool(
name="query",
description="Execute a read-only InfluxQL query against a specific database",
inputSchema={
"type": "object",
"properties": {
"database": {"type": "string", "description": "The database name"},
"query": {"type": "string", "description": "The InfluxQL query (must be a SELECT query)"}
},
"required": ["database", "query"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
client_id = id(app.request_context.session)
# Handle authentication
if name == "auth":
token = arguments.get("token")
if not token:
return types.CallToolResult(
isError=True,
content=[types.TextContent(type="text", text="No token provided")]
)
try:
payload = validate_token(token)
authenticated_clients.add(client_id)
username = payload.get("sub", "unknown")
logger.info(f"Client {client_id} authenticated as {username}")
return types.CallToolResult(
content=[types.TextContent(type="text", text=f"Authentication successful as {username}")]
)
except AuthError as e:
logger.warning(f"Authentication failed: {str(e)}")
return types.CallToolResult(
isError=True,
content=[types.TextContent(type="text", text=f"Authentication failed: {str(e)}")]
)
# For all other tools, require authentication
if client_id not in authenticated_clients:
logger.warning(f"Unauthenticated client {client_id} attempted to use {name}")
return types.CallToolResult(
isError=True,
content=[types.TextContent(type="text", text="Authentication required. Use the 'auth' tool with a valid JWT token first.")]
)
# Handle database operations
try:
if name == "list_databases":
databases = influx_client.list_databases()
return types.CallToolResult(
content=[types.TextContent(type="text", text=f"Available databases: {', '.join([db['name'] for db in databases])}")]
)
elif name == "list_measurements":
database = arguments.get("database")
if not database:
return types.CallToolResult(
isError=True,
content=[types.TextContent(type="text", text="Database name is required")]
)
measurements = influx_client.list_measurements(database)
measurement_names = [m['name'] for m in measurements]
return types.CallToolResult(
content=[types.TextContent(type="text", text=f"Measurements in '{database}': {', '.join(measurement_names)}")]
)
elif name == "query":
database = arguments.get("database")
query = arguments.get("query")
if not database or not query:
return types.CallToolResult(
isError=True,
content=[types.TextContent(type="text", text="Both database and query are required")]
)
result = influx_client.execute_query(database, query)
# Format the result as a readable string
formatted_result = f"Query results from '{database}':\n"
# Format the series data
if "series" in result["results"][0]:
for series in result["results"][0]["series"]:
formatted_result += f"\nMeasurement: {series.get('name', 'unknown')}\n"
# Column headers
columns = series.get("columns", [])
formatted_result += "| " + " | ".join(columns) + " |\n"
formatted_result += "|" + "---|" * len(columns) + "\n"
# Data rows
for values in series.get("values", []):
formatted_result += "| " + " | ".join(str(v) for v in values) + " |\n"
else:
formatted_result += "No data found for this query."
return types.CallToolResult(
content=[types.TextContent(type="text", text=formatted_result)]
)
else:
logger.warning(f"Unknown tool requested: {name}")
return types.CallToolResult(
isError=True,
content=[types.TextContent(type="text", text=f"Unknown tool: {name}")]
)
except Exception as e:
logger.error(f"Error executing tool {name}: {str(e)}")
return types.CallToolResult(
isError=True,
content=[types.TextContent(type="text", text=f"Error: {str(e)}")]
)
async def main():
logger.info("Starting InfluxDB MCP Server")
# Check if we can connect to InfluxDB
try:
influx_client.list_databases()
logger.info("Successfully connected to InfluxDB")
except Exception as e:
logger.error(f"Failed to connect to InfluxDB: {str(e)}")
sys.exit(1)
# Start the MCP server
async with stdio_server() as streams:
logger.info("MCP Server started successfully")
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())