Skip to main content
Glama
mcp_server.py•24 kB
import datetime import json import logging import os import sys import yaml from flask import Flask, current_app, jsonify, request try: from src.grafana_mcp_server.processor.grafana_processor import GrafanaApiProcessor from src.grafana_mcp_server.stdio_server import run_stdio_server except ImportError: # Fallback for when running the file directly from processor.grafana_processor import GrafanaApiProcessor from stdio_server import run_stdio_server app = Flask(__name__) logger = logging.getLogger(__name__) # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") # Load configuration from environment variables, then YAML as fallback def load_config(): # Try multiple possible config file locations possible_config_paths = [ os.path.join(os.path.dirname(__file__), "config.yaml"), # Same directory as this file "/app/config.yaml", # Docker container path "config.yaml", # Current working directory ] config = {} config_loaded = False for config_path in possible_config_paths: try: with open(config_path) as file: config = yaml.safe_load(file) logger.info(f"Successfully loaded config from: {config_path}") config_loaded = True break except FileNotFoundError: logger.debug(f"Config file not found at: {config_path}") continue except yaml.YAMLError as e: logger.error(f"Error parsing YAML configuration from {config_path}: {e}") continue if not config_loaded: logger.warning("No config file found, using environment variables only") config = {} # Environment variable overrides (preferred method) grafana_host = os.environ.get("GRAFANA_HOST") or (config.get("grafana", {}).get("host") if config.get("grafana") else None) grafana_api_key = os.environ.get("GRAFANA_API_KEY") or (config.get("grafana", {}).get("api_key") if config.get("grafana") else None) grafana_ssl_verify = os.environ.get("GRAFANA_SSL_VERIFY") or ( config.get("grafana", {}).get("ssl_verify", "true") if config.get("grafana") else "true" ) server_port = int(os.environ.get("MCP_SERVER_PORT") or (config.get("server", {}).get("port", 8000) if config.get("server") else 8000)) server_debug = os.environ.get("MCP_SERVER_DEBUG") if server_debug is not None: server_debug = server_debug.lower() in ["1", "true", "yes"] else: server_debug = config.get("server", {}).get("debug", True) if config.get("server") else True logger.info(f"Loaded configuration - Host: {grafana_host}, API Key: {'***' if grafana_api_key else 'None'}, Port: {server_port}") return { "grafana": { "host": grafana_host, "api_key": grafana_api_key, "ssl_verify": grafana_ssl_verify, }, "server": {"port": server_port, "debug": server_debug}, } # Initialize configuration and processor at app startup with app.app_context(): config = load_config() app.config["GRAFANA_CONFIG"] = config.get("grafana", {}) app.config["SERVER_CONFIG"] = config.get("server", {}) # Initialize Grafana processor try: app.config["grafana_processor"] = GrafanaApiProcessor( grafana_host=app.config["GRAFANA_CONFIG"].get("host"), grafana_api_key=app.config["GRAFANA_CONFIG"].get("api_key"), ssl_verify=app.config["GRAFANA_CONFIG"].get("ssl_verify", "true"), ) logger.info("Grafana processor initialized successfully") except Exception as e: logger.error(f"Failed to initialize Grafana processor: {e}") app.config["grafana_processor"] = None # Server info SERVER_INFO = {"name": "grafana-mcp-server", "version": "1.0.0"} # Server capabilities SERVER_CAPABILITIES = {"tools": {}} # Protocol version PROTOCOL_VERSION = "2025-06-18" def get_current_time_iso(): """Get current time in ISO format""" return datetime.datetime.now(datetime.timezone.utc).isoformat() # Available tools - Grafana MCP Server Tools TOOLS_LIST = [ { "name": "test_connection", "description": "Test connection to Grafana API to verify configuration and connectivity. Requires API key or open Grafana instance.", "inputSchema": {"type": "object", "properties": {}, "required": []}, }, { "name": "grafana_promql_query", "description": "Executes PromQL queries against Grafana's Prometheus datasource. " "Fetches metrics data using PromQL expressions, optimizes time series responses to reduce token size.", "inputSchema": { "type": "object", "properties": { "datasource_uid": { "type": "string", "description": "Prometheus datasource UID", }, "query": {"type": "string", "description": "PromQL query string"}, "start_time": { "type": "string", "description": "Start time in RFC3339 or relative string (e.g., 'now-2h', '2023-01-01T00:00:00Z')", }, "end_time": { "type": "string", "description": "End time in RFC3339 or relative string (e.g., 'now-2h', '2023-01-01T00:00:00Z')", }, "duration": { "type": "string", "description": "Duration string for the time window (e.g., '2h', '90m')", }, }, "required": ["datasource_uid", "query"], }, }, { "name": "grafana_loki_query", "description": "Queries Grafana Loki for log data. Fetches logs for a specified duration " "(e.g., '5m', '1h', '2d'), converts relative time to absolute timestamps. " "Note: Loki queries require at least one non-empty matcher. Use patterns like '{job=~\".+\"}' " "instead of '{job=~\".*\"}' or '{}' to avoid syntax errors.", "inputSchema": { "type": "object", "properties": { "datasource_uid": { "type": "string", "description": "Loki datasource UID", }, "query": {"type": "string", "description": "Loki query string (e.g., '{job=~\".+\"}' or '{app=\"myapp\"}')"}, "duration": { "type": "string", "description": "Time duration (e.g., '5m', '1h', '2d') - overrides start_time/end_time if provided", }, "start_time": { "type": "string", "description": "Start time in RFC3339 or relative string (e.g., 'now-2h', '2023-01-01T00:00:00Z')", }, "end_time": { "type": "string", "description": "End time in RFC3339 or relative string (e.g., 'now-2h', '2023-01-01T00:00:00Z')", }, "limit": { "type": "integer", "description": "Maximum number of log entries to return", "default": 100, }, }, "required": ["datasource_uid", "query"], }, }, { "name": "grafana_get_dashboard_config", "description": "Retrieves dashboard configuration details from the database. " "Queries the connectors_connectormetadatamodelstore table for dashboard metadata.", "inputSchema": { "type": "object", "properties": {"dashboard_uid": {"type": "string", "description": "Dashboard UID"}}, "required": ["dashboard_uid"], }, }, { "name": "grafana_query_dashboard_panels", "description": "Executes queries for specific dashboard panels. Can query up to 4 panels at once, " "supports template variables, optimizes metrics data.", "inputSchema": { "type": "object", "properties": { "dashboard_uid": {"type": "string", "description": "Dashboard UID"}, "panel_ids": { "type": "array", "items": {"type": "integer"}, "description": "List of panel IDs to query (max 4)", }, "template_variables": { "type": "object", "description": "Template variables for the dashboard", }, }, "required": ["dashboard_uid", "panel_ids"], }, }, { "name": "grafana_fetch_label_values", "description": "Fetches label values for dashboard variables from Prometheus datasource. " "Retrieves available values for specific labels (e.g., 'instance', 'job').", "inputSchema": { "type": "object", "properties": { "datasource_uid": { "type": "string", "description": "Prometheus datasource UID", }, "label_name": { "type": "string", "description": "Label name to fetch values for (e.g., 'instance', 'job')", }, "metric_match_filter": { "type": "string", "description": "Optional metric name filter (e.g., 'up', 'node_cpu_seconds_total')", }, }, "required": ["datasource_uid", "label_name"], }, }, { "name": "grafana_fetch_dashboard_variables", "description": "Fetches all variables and their values from a Grafana dashboard. " "Retrieves dashboard template variables and their current values.", "inputSchema": { "type": "object", "properties": {"dashboard_uid": {"type": "string", "description": "Dashboard UID"}}, "required": ["dashboard_uid"], }, }, { "name": "grafana_fetch_all_dashboards", "description": "Fetches all dashboards from Grafana with basic information like title, UID, folder, tags, etc.", "inputSchema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Maximum number of dashboards to return", "default": 100, } }, "required": [], }, }, { "name": "grafana_fetch_datasources", "description": "Fetches all datasources from Grafana with their configuration details.", "inputSchema": {"type": "object", "properties": {}, "required": []}, }, { "name": "grafana_fetch_folders", "description": "Fetches all folders from Grafana with their metadata and permissions.", "inputSchema": {"type": "object", "properties": {}, "required": []}, }, ] # Tool implementations def test_grafana_connection(): """Test connection to Grafana API""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.test_connection() if result: connection_details = grafana_processor.get_connection() return { "status": "success", "message": "Successfully connected to Grafana API", "auth_method": connection_details.get("auth_method"), "host": connection_details.get("host"), } else: return {"status": "failed", "message": "Failed to connect to Grafana API"} except Exception as e: logger.error(f"Error testing Grafana connection: {e!s}") return {"status": "error", "message": f"Failed to connect to Grafana: {e!s}"} def grafana_promql_query(datasource_uid, query, start_time=None, end_time=None, duration=None): """Execute PromQL query against Grafana's Prometheus datasource""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.grafana_promql_query(datasource_uid, query, start_time, end_time, duration) return result except Exception as e: logger.error(f"Error executing PromQL query: {e!s}") return {"status": "error", "message": f"PromQL query failed: {e!s}"} def grafana_loki_query(datasource_uid, query, duration=None, start_time=None, end_time=None, limit=100): """Query Grafana Loki for log data""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.grafana_loki_query(datasource_uid, query, duration, start_time, end_time, limit) return result except Exception as e: logger.error(f"Error executing Loki query: {e!s}") return {"status": "error", "message": f"Loki query failed: {e!s}"} def grafana_get_dashboard_config(dashboard_uid): """Get dashboard configuration details""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.grafana_get_dashboard_config_details(dashboard_uid) return result except Exception as e: logger.error(f"Error fetching dashboard config: {e!s}") return { "status": "error", "message": f"Failed to fetch dashboard config: {e!s}", } def grafana_query_dashboard_panels(dashboard_uid, panel_ids, template_variables=None): """Execute queries for specific dashboard panels""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.grafana_query_dashboard_panels(dashboard_uid, panel_ids, template_variables) return result except Exception as e: logger.error(f"Error querying dashboard panels: {e!s}") return { "status": "error", "message": f"Failed to query dashboard panels: {e!s}", } def grafana_fetch_label_values(datasource_uid, label_name, metric_match_filter=None): """Fetch label values for dashboard variables from Prometheus datasource""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.grafana_fetch_dashboard_variable_label_values(datasource_uid, label_name, metric_match_filter) return result except Exception as e: logger.error(f"Error fetching label values: {e!s}") return {"status": "error", "message": f"Failed to fetch label values: {e!s}"} def grafana_fetch_dashboard_variables(dashboard_uid): """Fetch all variables and their values from a Grafana dashboard""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.grafana_fetch_dashboard_variables(dashboard_uid) return result except Exception as e: logger.error(f"Error fetching dashboard variables: {e!s}") return { "status": "error", "message": f"Failed to fetch dashboard variables: {e!s}", } def grafana_fetch_all_dashboards(limit=100): """Fetch all dashboards from Grafana""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.grafana_fetch_all_dashboards(limit) return result except Exception as e: logger.error(f"Error fetching dashboards: {e!s}") return {"status": "error", "message": f"Failed to fetch dashboards: {e!s}"} def grafana_fetch_datasources(): """Fetch all datasources from Grafana""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.grafana_fetch_datasources() return result except Exception as e: logger.error(f"Error fetching datasources: {e!s}") return {"status": "error", "message": f"Failed to fetch datasources: {e!s}"} def grafana_fetch_folders(): """Fetch all folders from Grafana""" try: grafana_processor = current_app.config.get("grafana_processor") if not grafana_processor: return { "status": "error", "message": "Grafana processor not initialized. Check configuration.", } result = grafana_processor.grafana_fetch_folders() return result except Exception as e: logger.error(f"Error fetching folders: {e!s}") return {"status": "error", "message": f"Failed to fetch folders: {e!s}"} # Function mapping FUNCTION_MAPPING = { "test_connection": test_grafana_connection, "grafana_promql_query": grafana_promql_query, "grafana_loki_query": grafana_loki_query, "grafana_get_dashboard_config": grafana_get_dashboard_config, "grafana_query_dashboard_panels": grafana_query_dashboard_panels, "grafana_fetch_label_values": grafana_fetch_label_values, "grafana_fetch_dashboard_variables": grafana_fetch_dashboard_variables, "grafana_fetch_all_dashboards": grafana_fetch_all_dashboards, "grafana_fetch_datasources": grafana_fetch_datasources, "grafana_fetch_folders": grafana_fetch_folders, } def handle_jsonrpc_request(data): """Handle JSON-RPC 2.0 requests""" request_id = data.get("id") method = data.get("method") params = data.get("params", {}) logger.info(f"Handling JSON-RPC request: {method}") # Handle JSON-RPC notifications (no id field or method starts with 'notifications/') if method and method.startswith("notifications/"): logger.info(f"Received notification: {method}") return {"jsonrpc": "2.0", "result": {}, "id": request_id} # Handle initialization if method == "initialize": client_protocol_version = params.get("protocolVersion") # Accept any protocol version that starts with '2025-' if not (isinstance(client_protocol_version, str) and client_protocol_version.startswith("2025-")): return { "jsonrpc": "2.0", "error": { "code": -32602, "message": f"Unsupported protocol version: {client_protocol_version}", }, "id": request_id, } return { "jsonrpc": "2.0", "result": { "protocolVersion": PROTOCOL_VERSION, "capabilities": SERVER_CAPABILITIES, "serverInfo": SERVER_INFO, }, "id": request_id, } # Handle tools/list elif method == "tools/list": return { "jsonrpc": "2.0", "result": {"tools": TOOLS_LIST}, "id": request_id, } # Handle tools/call elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name not in FUNCTION_MAPPING: return { "jsonrpc": "2.0", "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}, "id": request_id, } try: # Execute the tool function result = FUNCTION_MAPPING[tool_name](**arguments) return { "jsonrpc": "2.0", "result": { "content": [{"type": "text", "text": json.dumps(result, indent=2)}], "isError": False, }, "id": request_id, } except Exception as e: logger.error(f"Error executing tool {tool_name}: {e!s}") return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Tool execution failed: {e!s}", }, "id": request_id, } # Unknown method else: return { "jsonrpc": "2.0", "error": {"code": -32601, "message": f"Method not found: {method}"}, "id": request_id, } @app.route("/mcp", methods=["POST"]) def mcp_endpoint(): """Main MCP endpoint for JSON-RPC requests""" if request.method != "POST": return jsonify({"error": "Only POST method is supported. Use POST with application/json."}), 405 data = request.get_json() logger.info(f"Received MCP request: {data}") if not data: return jsonify( { "jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error"}, "id": None, } ), 400 response = handle_jsonrpc_request(data) status_code = 200 if "error" in response: # Map error codes to HTTP status codes code = response["error"].get("code", -32000) if code == -32700 or code == -32600 or code == -32602: status_code = 400 elif code == -32601: status_code = 404 else: status_code = 500 return jsonify(response), status_code @app.route("/health", methods=["GET"]) def health_check(): """Health check endpoint""" return jsonify({"status": "ok", "timestamp": get_current_time_iso()}) @app.route("/", methods=["GET"]) def root(): """Root endpoint with server info""" return jsonify( { "name": "Grafana MCP Server", "version": "1.0.0", "status": "running", "endpoints": {"mcp": "/mcp", "health": "/health"}, } ) def main(): """Main entry point""" transport = os.environ.get("MCP_TRANSPORT", "http") if ("-t" in sys.argv and "stdio" in sys.argv) or ("--transport" in sys.argv and "stdio" in sys.argv) or (transport == "stdio"): def stdio_handler(data): with app.app_context(): return handle_jsonrpc_request(data) run_stdio_server(stdio_handler) else: # HTTP mode port = app.config["SERVER_CONFIG"].get("port", 8000) debug = app.config["SERVER_CONFIG"].get("debug", True) logger.info(f"Starting Grafana MCP Server on port {port}") app.run(host="0.0.0.0", port=port, debug=debug) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DrDroidLab/grafana-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server