prometheus
Query Prometheus metrics for Kubernetes cluster monitoring and visualization. Retrieve snapshot or time-series data formatted for charting to analyze system performance and resource usage.
Instructions
Queries a Prometheus server (snapshot or range) and returns metrics formatted for charting.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| ql | Yes | The PromQL query string to run against the Prometheus server. | |
| data_type | No | Type of query: 'snapshot' (instant) or 'range' (time-series). | snapshot |
| group_by | No | Label to group results by, such as 'pod' or 'namespace'. If not specified, behavior depends on the query (e.g., 'sum', 'avg'). | pod |
| unit | No | Desired output unit. Use 'auto' to infer from the query content (e.g., memory → MiB, CPU → cores). | auto |
| cluster | No | Target cluster name in a multi-cluster environment. Defaults to the hub cluster if not provided. | default |
| start | No | (range only) Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'. | |
| end | No | (range only) End time in ISO 8601 format. Defaults to current time if not provided. | |
| step | No | (range only) Resolution step (e.g., '30s', '5m', '1h'). Choose appropriately to keep the sample count under 200. |
Implementation Reference
- src/tools/prometheus.ts:74-179 (handler)Main handler function for the 'prometheus' tool. Connects to Prometheus thanos-querier, executes PromQL query (snapshot or range), groups results, applies unit conversion, and returns JSON-formatted data for charting.export async function prometheus({ ql, data_type = "snapshot", group_by = "pod", unit = "auto", cluster = "default", start, end, step = "5m", }: { ql: string; data_type: "snapshot" | "range"; group_by: string; unit: "auto" | "bytes" | "MiB" | "GiB" | "cores" | "millicores"; cluster?: string; start?: string; end?: string; step?: string; }): Promise<CallToolResult> { let responseData: any[] = []; try { const { url, token } = await getPrometheusURL(cluster); const headers = { Authorization: token }; const effectiveUnit = inferUnit(unit, ql); const httpsAgent = new https.Agent({ rejectUnauthorized: false }); if (data_type === "range") { const response = await axios.default.get(`${url}/api/v1/query_range`, { headers, params: { query: ql, start, end, step, }, httpsAgent, proxy: false, }); responseData = response.data.data.result.map((series: any) => ({ metric: series.metric, values: series.values.map(([timestamp, rawValue]: [number, string]) => [ dayjs.unix(timestamp).toISOString(), Number(rawValue) / (1024 * 1024), // bytes → MiB ]), })); } else { const response = await axios.default.get(`${url}/api/v1/query`, { headers, params: { query: ql }, httpsAgent, proxy: false, }); responseData = response.data.data.result.map( (entry: { metric: { [x: string]: any; }; value: (string | number)[]; }) => ( { [group_by]: entry.metric[group_by] || "value", value: transformValue(entry.value[1], effectiveUnit), } )); } // console.warn(responseData) if (responseData.length === 0) { return { content: [{ type: "text", text: JSON.stringify({ data: [], type: data_type, unit: effectiveUnit, }), }], }; // return { data: [], type: data_type, unit: effectiveUnit }; } return { content: [ { type: "text", text: JSON.stringify( { data: responseData, type: data_type, unit: effectiveUnit, }), } ], }; } catch (err: any) { console.error(responseData) console.error(err) return { content: [{ type: "text", text: `Failed to query Prometheus: ${err.message || String(err)}`, }], }; } }
- src/tools/prometheus.ts:16-70 (schema)Input schema (prometheusArgs using Zod) and description (prometheusDesc) for the prometheus tool.export const prometheusDesc = "Queries a Prometheus server (snapshot or range) and returns metrics formatted for charting." export const prometheusArgs = { ql: z.string().describe( "The PromQL query string to run against the Prometheus server." ), data_type: z .enum(["snapshot", "range"]) .describe("Type of query: 'snapshot' (instant) or 'range' (time-series).") .default("snapshot"), group_by: z .string() .describe( "Label to group results by, such as 'pod' or 'namespace'. If not specified, behavior depends on the query (e.g., 'sum', 'avg')." ) .default("pod"), unit: z .enum(["auto", "bytes", "MiB", "GiB", "cores", "millicores"]) .describe( "Desired output unit. Use 'auto' to infer from the query content (e.g., memory → MiB, CPU → cores)." ) .default("auto"), cluster: z .string() .describe( "Target cluster name in a multi-cluster environment. Defaults to the hub cluster if not provided." ) .default("default") .optional(), start: z .string() .describe( "(range only) Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'." ) .optional(), end: z .string() .describe( "(range only) End time in ISO 8601 format. Defaults to current time if not provided." ) .optional(), step: z .string() .describe( "(range only) Resolution step (e.g., '30s', '5m', '1h'). Choose appropriately to keep the sample count under 200." ) .optional(), };
- src/index.ts:45-49 (registration)Registration of the 'prometheus' tool in the MCP server using server.tool()."prometheus", prometheusDesc, prometheusArgs, async (args, extra) => prometheus(args) // ensure connectCluster matches (args, extra) => ... )
- src/tools/prometheus.ts:181-224 (helper)Helper function to resolve Prometheus URL and authentication token from Kubernetes custom object (OpenShift route).export async function getPrometheusURL(cluster?: string): Promise<{ url: string; token: string }> { const cacheKey = cluster || "default"; if (prometheusCache.has(cacheKey)) { return prometheusCache.get(cacheKey)!; } const kubeConfigFile = await getKubeconfigFile(cluster) const kc = new KubeConfig(); if (kubeConfigFile) { kc.loadFromFile(kubeConfigFile); } else { kc.loadFromDefault(); } const customApi = kc.makeApiClient(CustomObjectsApi); const res = await customApi.getNamespacedCustomObject({ group: "route.openshift.io", version: "v1", namespace: "openshift-monitoring", plural: "routes", name: "thanos-querier" }); const host = (res as any)?.spec?.host; if (!host) { throw new Error(`Failed to retrieve Prometheus route from cluster ${cluster}.`); } const user = kc.getCurrentUser(); const token = user?.token; if (!token) { throw new Error(`No token found in KUBECONFIG for cluster ${cluster}.`); } const result = { url: `https://${host}`, token: `Bearer ${token}`, }; prometheusCache.set(cacheKey, result); return result; }
- Python implementation of prometheus tool handler, decorated with @mcp.tool for automatic registration. Uses prometheus_api_client for queries.@mcp.tool(description="Query Prometheus metrics from a specific cluster and format the results for Recharts visualization.") def prometheus( ql: Annotated[str, Field(description="The PromQL query string to run against the Prometheus server.")], data_type: Annotated[str, Field(description="Type of query: 'snapshot' for instant or 'range' for time-series.")] = "snapshot", group_by: Annotated[str, Field(description="Label to group results by, such as 'pod' or 'namespace'.")] = "pod", unit: Annotated[str, Field(description="The desired output unit: 'auto', 'bytes', 'MiB', 'GiB', 'cores', or 'millicores'.")] = "auto", cluster: Annotated[Optional[str], Field(description="The target cluster name. Defaults to the hub cluster.")] = None, start: Annotated[ Optional[str], Field(description="(Only for data_type='range') Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'.") ] = None, end: Annotated[ Optional[str], Field(description="(Only for data_type='range') End time in ISO 8601 format. Defaults to now if not provided.") ] = None, step: Annotated[ Optional[str], Field(description="(Only for data_type='range') Query resolution step (e.g., '30s', '5m', '1h').") ] = "5m", ) -> Annotated[dict, Field(description="Formatted result including Recharts-compatible data or error message.")]: try: def infer_unit(unit: str, query: str) -> str: if unit != "auto": return unit q = query.lower() if "memory" in q or "bytes" in q: return "GiB" elif "cpu" in q: return "cores" return "raw" def transform_value(value: float, unit: str) -> float: value = float(value) if unit == "MiB": return value / (1024 ** 2) elif unit == "GiB": return value / (1024 ** 3) elif unit == "millicores": return value * 1000 return value # Set up cluster access kubeconfig_file = None if cluster and cluster != "default": kubeconfig_file = get_kubeconfig_file(cluster) if not validate_kubeconfig_file(kubeconfig_file): kubeconfig_file = setup_cluster_access(cluster) if not kubeconfig_file: raise FileNotFoundError(f"KUBECONFIG for cluster '{cluster}' does not exist.") pc = prom_connect(kubeconfig=kubeconfig_file) effective_unit = infer_unit(unit, ql) # Query data if data_type == "range": end_dt = parse_datetime(end) start_dt = parse_datetime(start) result = pc.custom_query_range( query=ql, start_time=start_dt, end_time=end_dt, step=step ) else: result = pc.custom_query(query=ql) if len(result) == 0: return { "data": [], "type": data_type, "unit": effective_unit } # Format result recharts_data = [] if data_type == "snapshot": df = MetricSnapshotDataFrame(result) recharts_data = [ { "name": row.get(group_by, "unknown"), "value": transform_value(row["value"], effective_unit) } for _, row in df.iterrows() ] elif data_type == "range": df = MetricRangeDataFrame(result) df["value"]=df["value"].astype(float) # df.index= pandas.to_datetime(df.index, unit="s") df["name"] = df.index columns_to_keep = ["name", "namespace", "pod", "value", group_by] columns_to_keep = list(dict.fromkeys(columns_to_keep)) df = df[[col for col in columns_to_keep if col in df.columns]].copy() for ts, group in df.groupby("name"): if isinstance(ts, pandas.Timestamp): entry = {"name": ts.isoformat()} else: # entry["name"] = ts.isoformat() entry = {"name": ts} for _, row in group.iterrows(): key = row.get(group_by, "unknown") entry[key] = transform_value(row["value"], effective_unit) recharts_data.append(entry) else: raise ValueError("Invalid data_type. Must be 'snapshot' or 'range'.") print({ "data": recharts_data, "type": data_type, "unit": effective_unit }) return { "data": recharts_data, "type": data_type, "unit": effective_unit } except Exception as e: return {"not get the data": str(e)}