Skip to main content
Glama

Prometheus MCP Server

server.py17.8 kB
#!/usr/bin/env python import os import json from typing import Any, Dict, List, Optional, Union from dataclasses import dataclass import time from datetime import datetime, timedelta from enum import Enum import dotenv import requests from fastmcp import FastMCP, Context from prometheus_mcp_server.logging_config import get_logger dotenv.load_dotenv() mcp = FastMCP("Prometheus MCP") # Cache for metrics list to improve completion performance _metrics_cache = {"data": None, "timestamp": 0} _CACHE_TTL = 300 # 5 minutes # Get logger instance logger = get_logger() # Health check tool for Docker containers and monitoring @mcp.tool( description="Health check endpoint for container monitoring and status verification", annotations={ "title": "Health Check", "icon": "❤️", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def health_check() -> Dict[str, Any]: """Return health status of the MCP server and Prometheus connection. Returns: Health status including service information, configuration, and connectivity """ try: health_status = { "status": "healthy", "service": "prometheus-mcp-server", "version": "1.5.0", "timestamp": datetime.utcnow().isoformat(), "transport": config.mcp_server_config.mcp_server_transport if config.mcp_server_config else "stdio", "configuration": { "prometheus_url_configured": bool(config.url), "authentication_configured": bool(config.username or config.token), "org_id_configured": bool(config.org_id) } } # Test Prometheus connectivity if configured if config.url: try: # Quick connectivity test make_prometheus_request("query", params={"query": "up", "time": str(int(time.time()))}) health_status["prometheus_connectivity"] = "healthy" health_status["prometheus_url"] = config.url except Exception as e: health_status["prometheus_connectivity"] = "unhealthy" health_status["prometheus_error"] = str(e) health_status["status"] = "degraded" else: health_status["status"] = "unhealthy" health_status["error"] = "PROMETHEUS_URL not configured" logger.info("Health check completed", status=health_status["status"]) return health_status except Exception as e: logger.error("Health check failed", error=str(e)) return { "status": "unhealthy", "service": "prometheus-mcp-server", "error": str(e), "timestamp": datetime.utcnow().isoformat() } class TransportType(str, Enum): """Supported MCP server transport types.""" STDIO = "stdio" HTTP = "http" SSE = "sse" @classmethod def values(cls) -> list[str]: """Get all valid transport values.""" return [transport.value for transport in cls] @dataclass class MCPServerConfig: """Global Configuration for MCP.""" mcp_server_transport: TransportType = None mcp_bind_host: str = None mcp_bind_port: int = None def __post_init__(self): """Validate mcp configuration.""" if not self.mcp_server_transport: raise ValueError("MCP SERVER TRANSPORT is required") if not self.mcp_bind_host: raise ValueError(f"MCP BIND HOST is required") if not self.mcp_bind_port: raise ValueError(f"MCP BIND PORT is required") @dataclass class PrometheusConfig: url: str url_ssl_verify: bool = True disable_prometheus_links: bool = False # Optional credentials username: Optional[str] = None password: Optional[str] = None token: Optional[str] = None # Optional Org ID for multi-tenant setups org_id: Optional[str] = None # Optional Custom MCP Server Configuration mcp_server_config: Optional[MCPServerConfig] = None config = PrometheusConfig( url=os.environ.get("PROMETHEUS_URL", ""), url_ssl_verify=os.environ.get("PROMETHEUS_URL_SSL_VERIFY", "True").lower() in ("true", "1", "yes"), disable_prometheus_links=os.environ.get("PROMETHEUS_DISABLE_LINKS", "False").lower() in ("true", "1", "yes"), username=os.environ.get("PROMETHEUS_USERNAME", ""), password=os.environ.get("PROMETHEUS_PASSWORD", ""), token=os.environ.get("PROMETHEUS_TOKEN", ""), org_id=os.environ.get("ORG_ID", ""), mcp_server_config=MCPServerConfig( mcp_server_transport=os.environ.get("PROMETHEUS_MCP_SERVER_TRANSPORT", "stdio").lower(), mcp_bind_host=os.environ.get("PROMETHEUS_MCP_BIND_HOST", "127.0.0.1"), mcp_bind_port=int(os.environ.get("PROMETHEUS_MCP_BIND_PORT", "8080")) ) ) def get_prometheus_auth(): """Get authentication for Prometheus based on provided credentials.""" if config.token: return {"Authorization": f"Bearer {config.token}"} elif config.username and config.password: return requests.auth.HTTPBasicAuth(config.username, config.password) return None def make_prometheus_request(endpoint, params=None): """Make a request to the Prometheus API with proper authentication and headers.""" if not config.url: logger.error("Prometheus configuration missing", error="PROMETHEUS_URL not set") raise ValueError("Prometheus configuration is missing. Please set PROMETHEUS_URL environment variable.") if not config.url_ssl_verify: logger.warning("SSL certificate verification is disabled. This is insecure and should not be used in production environments.", endpoint=endpoint) url = f"{config.url.rstrip('/')}/api/v1/{endpoint}" url_ssl_verify = config.url_ssl_verify auth = get_prometheus_auth() headers = {} if isinstance(auth, dict): # Token auth is passed via headers headers.update(auth) auth = None # Clear auth for requests.get if it's already in headers # Add OrgID header if specified if config.org_id: headers["X-Scope-OrgID"] = config.org_id try: logger.debug("Making Prometheus API request", endpoint=endpoint, url=url, params=params) # Make the request with appropriate headers and auth response = requests.get(url, params=params, auth=auth, headers=headers, verify=url_ssl_verify) response.raise_for_status() result = response.json() if result["status"] != "success": error_msg = result.get('error', 'Unknown error') logger.error("Prometheus API returned error", endpoint=endpoint, error=error_msg, status=result["status"]) raise ValueError(f"Prometheus API error: {error_msg}") data_field = result.get("data", {}) if isinstance(data_field, dict): result_type = data_field.get("resultType") else: result_type = "list" logger.debug("Prometheus API request successful", endpoint=endpoint, result_type=result_type) return result["data"] except requests.exceptions.RequestException as e: logger.error("HTTP request to Prometheus failed", endpoint=endpoint, url=url, error=str(e), error_type=type(e).__name__) raise except json.JSONDecodeError as e: logger.error("Failed to parse Prometheus response as JSON", endpoint=endpoint, url=url, error=str(e)) raise ValueError(f"Invalid JSON response from Prometheus: {str(e)}") except Exception as e: logger.error("Unexpected error during Prometheus request", endpoint=endpoint, url=url, error=str(e), error_type=type(e).__name__) raise def get_cached_metrics() -> List[str]: """Get metrics list with caching to improve completion performance. This helper function is available for future completion support when FastMCP implements the completion capability. For now, it can be used internally to optimize repeated metric list requests. """ current_time = time.time() # Check if cache is valid if _metrics_cache["data"] is not None and (current_time - _metrics_cache["timestamp"]) < _CACHE_TTL: logger.debug("Using cached metrics list", cache_age=current_time - _metrics_cache["timestamp"]) return _metrics_cache["data"] # Fetch fresh metrics try: data = make_prometheus_request("label/__name__/values") _metrics_cache["data"] = data _metrics_cache["timestamp"] = current_time logger.debug("Refreshed metrics cache", metric_count=len(data)) return data except Exception as e: logger.error("Failed to fetch metrics for cache", error=str(e)) # Return cached data if available, even if expired return _metrics_cache["data"] if _metrics_cache["data"] is not None else [] # Note: Argument completions will be added when FastMCP supports the completion # capability. The get_cached_metrics() function above is ready for that integration. @mcp.tool( description="Execute a PromQL instant query against Prometheus", annotations={ "title": "Execute PromQL Query", "icon": "📊", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def execute_query(query: str, time: Optional[str] = None) -> Dict[str, Any]: """Execute an instant query against Prometheus. Args: query: PromQL query string time: Optional RFC3339 or Unix timestamp (default: current time) Returns: Query result with type (vector, matrix, scalar, string) and values """ params = {"query": query} if time: params["time"] = time logger.info("Executing instant query", query=query, time=time) data = make_prometheus_request("query", params=params) result = { "resultType": data["resultType"], "result": data["result"] } if not config.disable_prometheus_links: from urllib.parse import urlencode ui_params = {"g0.expr": query, "g0.tab": "0"} if time: ui_params["g0.moment_input"] = time prometheus_ui_link = f"{config.url.rstrip('/')}/graph?{urlencode(ui_params)}" result["links"] = [{ "href": prometheus_ui_link, "rel": "prometheus-ui", "title": "View in Prometheus UI" }] logger.info("Instant query completed", query=query, result_type=data["resultType"], result_count=len(data["result"]) if isinstance(data["result"], list) else 1) return result @mcp.tool( description="Execute a PromQL range query with start time, end time, and step interval", annotations={ "title": "Execute PromQL Range Query", "icon": "📈", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def execute_range_query(query: str, start: str, end: str, step: str, ctx: Context | None = None) -> Dict[str, Any]: """Execute a range query against Prometheus. Args: query: PromQL query string start: Start time as RFC3339 or Unix timestamp end: End time as RFC3339 or Unix timestamp step: Query resolution step width (e.g., '15s', '1m', '1h') Returns: Range query result with type (usually matrix) and values over time """ params = { "query": query, "start": start, "end": end, "step": step } logger.info("Executing range query", query=query, start=start, end=end, step=step) # Report progress if context available if ctx: await ctx.report_progress(progress=0, total=100, message="Initiating range query...") data = make_prometheus_request("query_range", params=params) # Report progress if ctx: await ctx.report_progress(progress=50, total=100, message="Processing query results...") result = { "resultType": data["resultType"], "result": data["result"] } if not config.disable_prometheus_links: from urllib.parse import urlencode ui_params = { "g0.expr": query, "g0.tab": "0", "g0.range_input": f"{start} to {end}", "g0.step_input": step } prometheus_ui_link = f"{config.url.rstrip('/')}/graph?{urlencode(ui_params)}" result["links"] = [{ "href": prometheus_ui_link, "rel": "prometheus-ui", "title": "View in Prometheus UI" }] # Report completion if ctx: await ctx.report_progress(progress=100, total=100, message="Range query completed") logger.info("Range query completed", query=query, result_type=data["resultType"], result_count=len(data["result"]) if isinstance(data["result"], list) else 1) return result @mcp.tool( description="List all available metrics in Prometheus with optional pagination support", annotations={ "title": "List Available Metrics", "icon": "📋", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def list_metrics( limit: Optional[int] = None, offset: int = 0, filter_pattern: Optional[str] = None, ctx: Context | None = None ) -> Dict[str, Any]: """Retrieve a list of all metric names available in Prometheus. Args: limit: Maximum number of metrics to return (default: all metrics) offset: Number of metrics to skip for pagination (default: 0) filter_pattern: Optional substring to filter metric names (case-insensitive) Returns: Dictionary containing: - metrics: List of metric names - total_count: Total number of metrics (before pagination) - returned_count: Number of metrics returned - offset: Current offset - has_more: Whether more metrics are available """ logger.info("Listing available metrics", limit=limit, offset=offset, filter_pattern=filter_pattern) # Report progress if context available if ctx: await ctx.report_progress(progress=0, total=100, message="Fetching metrics list...") data = make_prometheus_request("label/__name__/values") if ctx: await ctx.report_progress(progress=50, total=100, message=f"Processing {len(data)} metrics...") # Apply filter if provided if filter_pattern: filtered_data = [m for m in data if filter_pattern.lower() in m.lower()] logger.debug("Applied filter", original_count=len(data), filtered_count=len(filtered_data), pattern=filter_pattern) data = filtered_data total_count = len(data) # Apply pagination start_idx = offset end_idx = offset + limit if limit is not None else len(data) paginated_data = data[start_idx:end_idx] result = { "metrics": paginated_data, "total_count": total_count, "returned_count": len(paginated_data), "offset": offset, "has_more": end_idx < total_count } if ctx: await ctx.report_progress(progress=100, total=100, message=f"Retrieved {len(paginated_data)} of {total_count} metrics") logger.info("Metrics list retrieved", total_count=total_count, returned_count=len(paginated_data), offset=offset, has_more=result["has_more"]) return result @mcp.tool( description="Get metadata for a specific metric", annotations={ "title": "Get Metric Metadata", "icon": "ℹ️", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def get_metric_metadata(metric: str) -> List[Dict[str, Any]]: """Get metadata about a specific metric. Args: metric: The name of the metric to retrieve metadata for Returns: List of metadata entries for the metric """ logger.info("Retrieving metric metadata", metric=metric) endpoint = f"metadata?metric={metric}" data = make_prometheus_request(endpoint, params=None) if "metadata" in data: metadata = data["metadata"] elif "data" in data: metadata = data["data"] else: metadata = data if isinstance(metadata, dict): metadata = [metadata] logger.info("Metric metadata retrieved", metric=metric, metadata_count=len(metadata)) return metadata @mcp.tool( description="Get information about all scrape targets", annotations={ "title": "Get Scrape Targets", "icon": "🎯", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def get_targets() -> Dict[str, List[Dict[str, Any]]]: """Get information about all Prometheus scrape targets. Returns: Dictionary with active and dropped targets information """ logger.info("Retrieving scrape targets information") data = make_prometheus_request("targets") result = { "activeTargets": data["activeTargets"], "droppedTargets": data["droppedTargets"] } logger.info("Scrape targets retrieved", active_targets=len(data["activeTargets"]), dropped_targets=len(data["droppedTargets"])) return result if __name__ == "__main__": logger.info("Starting Prometheus MCP Server", mode="direct") mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pab1it0/prometheus-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server