prometheus
Query and visualize Prometheus metrics from Kubernetes clusters using PromQL. Supports instant and time-series data, grouping by labels, and customized units for Recharts integration.
Instructions
Query Prometheus metrics from a specific cluster and format the results for Recharts visualization.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| cluster | No | The target cluster name. Defaults to the hub cluster. | |
| data_type | No | Type of query: 'snapshot' for instant or 'range' for time-series. | snapshot |
| end | No | (Only for data_type='range') End time in ISO 8601 format. Defaults to now if not provided. | |
| group_by | No | Label to group results by, such as 'pod' or 'namespace'. | pod |
| ql | Yes | The PromQL query string to run against the Prometheus server. | |
| start | No | (Only for data_type='range') Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'. | |
| step | No | (Only for data_type='range') Query resolution step (e.g., '30s', '5m', '1h'). | 5m |
| unit | No | The desired output unit: 'auto', 'bytes', 'MiB', 'GiB', 'cores', or 'millicores'. | auto |
Implementation Reference
- Python implementation of the 'prometheus' MCP tool handler, including parameter schema via Pydantic Annotated Fields and self-registration via @mcp.tool decorator. Uses prometheus_api_client to query and formats for Recharts.@mcp.tool(description="Query Prometheus metrics from a specific cluster and format the results for Recharts visualization.") def prometheus( ql: Annotated[str, Field(description="The PromQL query string to run against the Prometheus server.")], data_type: Annotated[str, Field(description="Type of query: 'snapshot' for instant or 'range' for time-series.")] = "snapshot", group_by: Annotated[str, Field(description="Label to group results by, such as 'pod' or 'namespace'.")] = "pod", unit: Annotated[str, Field(description="The desired output unit: 'auto', 'bytes', 'MiB', 'GiB', 'cores', or 'millicores'.")] = "auto", cluster: Annotated[Optional[str], Field(description="The target cluster name. Defaults to the hub cluster.")] = None, start: Annotated[ Optional[str], Field(description="(Only for data_type='range') Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'.") ] = None, end: Annotated[ Optional[str], Field(description="(Only for data_type='range') End time in ISO 8601 format. Defaults to now if not provided.") ] = None, step: Annotated[ Optional[str], Field(description="(Only for data_type='range') Query resolution step (e.g., '30s', '5m', '1h').") ] = "5m", ) -> Annotated[dict, Field(description="Formatted result including Recharts-compatible data or error message.")]: try: def infer_unit(unit: str, query: str) -> str: if unit != "auto": return unit q = query.lower() if "memory" in q or "bytes" in q: return "GiB" elif "cpu" in q: return "cores" return "raw" def transform_value(value: float, unit: str) -> float: value = float(value) if unit == "MiB": return value / (1024 ** 2) elif unit == "GiB": return value / (1024 ** 3) elif unit == "millicores": return value * 1000 return value # Set up cluster access kubeconfig_file = None if cluster and cluster != "default": kubeconfig_file = get_kubeconfig_file(cluster) if not validate_kubeconfig_file(kubeconfig_file): kubeconfig_file = setup_cluster_access(cluster) if not kubeconfig_file: raise FileNotFoundError(f"KUBECONFIG for cluster '{cluster}' does not exist.") pc = prom_connect(kubeconfig=kubeconfig_file) effective_unit = infer_unit(unit, ql) # Query data if data_type == "range": end_dt = parse_datetime(end) start_dt = parse_datetime(start) result = pc.custom_query_range( query=ql, start_time=start_dt, end_time=end_dt, step=step ) else: result = pc.custom_query(query=ql) if len(result) == 0: return { "data": [], "type": data_type, "unit": effective_unit } # Format result recharts_data = [] if data_type == "snapshot": df = MetricSnapshotDataFrame(result) recharts_data = [ { "name": row.get(group_by, "unknown"), "value": transform_value(row["value"], effective_unit) } for _, row in df.iterrows() ] elif data_type == "range": df = MetricRangeDataFrame(result) df["value"]=df["value"].astype(float) # df.index= pandas.to_datetime(df.index, unit="s") df["name"] = df.index columns_to_keep = ["name", "namespace", "pod", "value", group_by] columns_to_keep = list(dict.fromkeys(columns_to_keep)) df = df[[col for col in columns_to_keep if col in df.columns]].copy() for ts, group in df.groupby("name"): if isinstance(ts, pandas.Timestamp): entry = {"name": ts.isoformat()} else: # entry["name"] = ts.isoformat() entry = {"name": ts} for _, row in group.iterrows(): key = row.get(group_by, "unknown") entry[key] = transform_value(row["value"], effective_unit) recharts_data.append(entry) else: raise ValueError("Invalid data_type. Must be 'snapshot' or 'range'.") print({ "data": recharts_data, "type": data_type, "unit": effective_unit }) return { "data": recharts_data, "type": data_type, "unit": effective_unit } except Exception as e: return {"not get the data": str(e)}
- src/tools/prometheus.ts:74-179 (handler)TypeScript implementation of the 'prometheus' tool handler function. Connects to Thanos-Querier via discovered route, executes PromQL query using axios, transforms units, and returns formatted data.export async function prometheus({ ql, data_type = "snapshot", group_by = "pod", unit = "auto", cluster = "default", start, end, step = "5m", }: { ql: string; data_type: "snapshot" | "range"; group_by: string; unit: "auto" | "bytes" | "MiB" | "GiB" | "cores" | "millicores"; cluster?: string; start?: string; end?: string; step?: string; }): Promise<CallToolResult> { let responseData: any[] = []; try { const { url, token } = await getPrometheusURL(cluster); const headers = { Authorization: token }; const effectiveUnit = inferUnit(unit, ql); const httpsAgent = new https.Agent({ rejectUnauthorized: false }); if (data_type === "range") { const response = await axios.default.get(`${url}/api/v1/query_range`, { headers, params: { query: ql, start, end, step, }, httpsAgent, proxy: false, }); responseData = response.data.data.result.map((series: any) => ({ metric: series.metric, values: series.values.map(([timestamp, rawValue]: [number, string]) => [ dayjs.unix(timestamp).toISOString(), Number(rawValue) / (1024 * 1024), // bytes → MiB ]), })); } else { const response = await axios.default.get(`${url}/api/v1/query`, { headers, params: { query: ql }, httpsAgent, proxy: false, }); responseData = response.data.data.result.map( (entry: { metric: { [x: string]: any; }; value: (string | number)[]; }) => ( { [group_by]: entry.metric[group_by] || "value", value: transformValue(entry.value[1], effectiveUnit), } )); } // console.warn(responseData) if (responseData.length === 0) { return { content: [{ type: "text", text: JSON.stringify({ data: [], type: data_type, unit: effectiveUnit, }), }], }; // return { data: [], type: data_type, unit: effectiveUnit }; } return { content: [ { type: "text", text: JSON.stringify( { data: responseData, type: data_type, unit: effectiveUnit, }), } ], }; } catch (err: any) { console.error(responseData) console.error(err) return { content: [{ type: "text", text: `Failed to query Prometheus: ${err.message || String(err)}`, }], }; } }
- src/index.ts:44-48 (registration)Explicit registration of the 'prometheus' tool on the TypeScript MCP server instance, linking description, Zod schema, and handler function.server.tool( "prometheus", prometheusDesc, prometheusArgs, async (args, extra) => prometheus(args) // ensure connectCluster matches (args, extra) => ...
- src/tools/prometheus.ts:18-70 (schema)Zod schema (prometheusArgs) defining input parameters for the 'prometheus' tool in TypeScript, matching the Python version.export const prometheusArgs = { ql: z.string().describe( "The PromQL query string to run against the Prometheus server." ), data_type: z .enum(["snapshot", "range"]) .describe("Type of query: 'snapshot' (instant) or 'range' (time-series).") .default("snapshot"), group_by: z .string() .describe( "Label to group results by, such as 'pod' or 'namespace'. If not specified, behavior depends on the query (e.g., 'sum', 'avg')." ) .default("pod"), unit: z .enum(["auto", "bytes", "MiB", "GiB", "cores", "millicores"]) .describe( "Desired output unit. Use 'auto' to infer from the query content (e.g., memory → MiB, CPU → cores)." ) .default("auto"), cluster: z .string() .describe( "Target cluster name in a multi-cluster environment. Defaults to the hub cluster if not provided." ) .default("default") .optional(), start: z .string() .describe( "(range only) Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'." ) .optional(), end: z .string() .describe( "(range only) End time in ISO 8601 format. Defaults to current time if not provided." ) .optional(), step: z .string() .describe( "(range only) Resolution step (e.g., '30s', '5m', '1h'). Choose appropriately to keep the sample count under 200." ) .optional(), };
- src/tools/prometheus.ts:181-224 (helper)Helper function to resolve Prometheus (Thanos-Querier) URL and auth token from Kubernetes route, with caching.export async function getPrometheusURL(cluster?: string): Promise<{ url: string; token: string }> { const cacheKey = cluster || "default"; if (prometheusCache.has(cacheKey)) { return prometheusCache.get(cacheKey)!; } const kubeConfigFile = await getKubeconfigFile(cluster) const kc = new KubeConfig(); if (kubeConfigFile) { kc.loadFromFile(kubeConfigFile); } else { kc.loadFromDefault(); } const customApi = kc.makeApiClient(CustomObjectsApi); const res = await customApi.getNamespacedCustomObject({ group: "route.openshift.io", version: "v1", namespace: "openshift-monitoring", plural: "routes", name: "thanos-querier" }); const host = (res as any)?.spec?.host; if (!host) { throw new Error(`Failed to retrieve Prometheus route from cluster ${cluster}.`); } const user = kc.getCurrentUser(); const token = user?.token; if (!token) { throw new Error(`No token found in KUBECONFIG for cluster ${cluster}.`); } const result = { url: `https://${host}`, token: `Bearer ${token}`, }; prometheusCache.set(cacheKey, result); return result; }