Skip to main content
Glama
yanmxa

OCM MCP Server

by yanmxa

prometheus

Query Prometheus metrics for Kubernetes cluster monitoring and visualization. Retrieve snapshot or time-series data formatted for charting to analyze system performance and resource usage.

Instructions

Queries a Prometheus server (snapshot or range) and returns metrics formatted for charting.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
qlYesThe PromQL query string to run against the Prometheus server.
data_typeNoType of query: 'snapshot' (instant) or 'range' (time-series).snapshot
group_byNoLabel to group results by, such as 'pod' or 'namespace'. If not specified, behavior depends on the query (e.g., 'sum', 'avg').pod
unitNoDesired output unit. Use 'auto' to infer from the query content (e.g., memory → MiB, CPU → cores).auto
clusterNoTarget cluster name in a multi-cluster environment. Defaults to the hub cluster if not provided.default
startNo(range only) Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'.
endNo(range only) End time in ISO 8601 format. Defaults to current time if not provided.
stepNo(range only) Resolution step (e.g., '30s', '5m', '1h'). Choose appropriately to keep the sample count under 200.

Implementation Reference

  • Main handler function for the 'prometheus' tool. Connects to Prometheus thanos-querier, executes PromQL query (snapshot or range), groups results, applies unit conversion, and returns JSON-formatted data for charting.
    export async function prometheus({ ql, data_type = "snapshot", group_by = "pod", unit = "auto", cluster = "default", start, end, step = "5m", }: { ql: string; data_type: "snapshot" | "range"; group_by: string; unit: "auto" | "bytes" | "MiB" | "GiB" | "cores" | "millicores"; cluster?: string; start?: string; end?: string; step?: string; }): Promise<CallToolResult> { let responseData: any[] = []; try { const { url, token } = await getPrometheusURL(cluster); const headers = { Authorization: token }; const effectiveUnit = inferUnit(unit, ql); const httpsAgent = new https.Agent({ rejectUnauthorized: false }); if (data_type === "range") { const response = await axios.default.get(`${url}/api/v1/query_range`, { headers, params: { query: ql, start, end, step, }, httpsAgent, proxy: false, }); responseData = response.data.data.result.map((series: any) => ({ metric: series.metric, values: series.values.map(([timestamp, rawValue]: [number, string]) => [ dayjs.unix(timestamp).toISOString(), Number(rawValue) / (1024 * 1024), // bytes → MiB ]), })); } else { const response = await axios.default.get(`${url}/api/v1/query`, { headers, params: { query: ql }, httpsAgent, proxy: false, }); responseData = response.data.data.result.map( (entry: { metric: { [x: string]: any; }; value: (string | number)[]; }) => ( { [group_by]: entry.metric[group_by] || "value", value: transformValue(entry.value[1], effectiveUnit), } )); } // console.warn(responseData) if (responseData.length === 0) { return { content: [{ type: "text", text: JSON.stringify({ data: [], type: data_type, unit: effectiveUnit, }), }], }; // return { data: [], type: data_type, unit: effectiveUnit }; } return { content: [ { type: "text", text: JSON.stringify( { data: responseData, type: data_type, unit: effectiveUnit, }), } ], }; } catch (err: any) { console.error(responseData) console.error(err) return { content: [{ type: "text", text: `Failed to query Prometheus: ${err.message || String(err)}`, }], }; } }
  • Input schema (prometheusArgs using Zod) and description (prometheusDesc) for the prometheus tool.
    export const prometheusDesc = "Queries a Prometheus server (snapshot or range) and returns metrics formatted for charting." export const prometheusArgs = { ql: z.string().describe( "The PromQL query string to run against the Prometheus server." ), data_type: z .enum(["snapshot", "range"]) .describe("Type of query: 'snapshot' (instant) or 'range' (time-series).") .default("snapshot"), group_by: z .string() .describe( "Label to group results by, such as 'pod' or 'namespace'. If not specified, behavior depends on the query (e.g., 'sum', 'avg')." ) .default("pod"), unit: z .enum(["auto", "bytes", "MiB", "GiB", "cores", "millicores"]) .describe( "Desired output unit. Use 'auto' to infer from the query content (e.g., memory → MiB, CPU → cores)." ) .default("auto"), cluster: z .string() .describe( "Target cluster name in a multi-cluster environment. Defaults to the hub cluster if not provided." ) .default("default") .optional(), start: z .string() .describe( "(range only) Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'." ) .optional(), end: z .string() .describe( "(range only) End time in ISO 8601 format. Defaults to current time if not provided." ) .optional(), step: z .string() .describe( "(range only) Resolution step (e.g., '30s', '5m', '1h'). Choose appropriately to keep the sample count under 200." ) .optional(), };
  • src/index.ts:45-49 (registration)
    Registration of the 'prometheus' tool in the MCP server using server.tool().
    "prometheus", prometheusDesc, prometheusArgs, async (args, extra) => prometheus(args) // ensure connectCluster matches (args, extra) => ... )
  • Helper function to resolve Prometheus URL and authentication token from Kubernetes custom object (OpenShift route).
    export async function getPrometheusURL(cluster?: string): Promise<{ url: string; token: string }> { const cacheKey = cluster || "default"; if (prometheusCache.has(cacheKey)) { return prometheusCache.get(cacheKey)!; } const kubeConfigFile = await getKubeconfigFile(cluster) const kc = new KubeConfig(); if (kubeConfigFile) { kc.loadFromFile(kubeConfigFile); } else { kc.loadFromDefault(); } const customApi = kc.makeApiClient(CustomObjectsApi); const res = await customApi.getNamespacedCustomObject({ group: "route.openshift.io", version: "v1", namespace: "openshift-monitoring", plural: "routes", name: "thanos-querier" }); const host = (res as any)?.spec?.host; if (!host) { throw new Error(`Failed to retrieve Prometheus route from cluster ${cluster}.`); } const user = kc.getCurrentUser(); const token = user?.token; if (!token) { throw new Error(`No token found in KUBECONFIG for cluster ${cluster}.`); } const result = { url: `https://${host}`, token: `Bearer ${token}`, }; prometheusCache.set(cacheKey, result); return result; }
  • Python implementation of prometheus tool handler, decorated with @mcp.tool for automatic registration. Uses prometheus_api_client for queries.
    @mcp.tool(description="Query Prometheus metrics from a specific cluster and format the results for Recharts visualization.") def prometheus( ql: Annotated[str, Field(description="The PromQL query string to run against the Prometheus server.")], data_type: Annotated[str, Field(description="Type of query: 'snapshot' for instant or 'range' for time-series.")] = "snapshot", group_by: Annotated[str, Field(description="Label to group results by, such as 'pod' or 'namespace'.")] = "pod", unit: Annotated[str, Field(description="The desired output unit: 'auto', 'bytes', 'MiB', 'GiB', 'cores', or 'millicores'.")] = "auto", cluster: Annotated[Optional[str], Field(description="The target cluster name. Defaults to the hub cluster.")] = None, start: Annotated[ Optional[str], Field(description="(Only for data_type='range') Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'.") ] = None, end: Annotated[ Optional[str], Field(description="(Only for data_type='range') End time in ISO 8601 format. Defaults to now if not provided.") ] = None, step: Annotated[ Optional[str], Field(description="(Only for data_type='range') Query resolution step (e.g., '30s', '5m', '1h').") ] = "5m", ) -> Annotated[dict, Field(description="Formatted result including Recharts-compatible data or error message.")]: try: def infer_unit(unit: str, query: str) -> str: if unit != "auto": return unit q = query.lower() if "memory" in q or "bytes" in q: return "GiB" elif "cpu" in q: return "cores" return "raw" def transform_value(value: float, unit: str) -> float: value = float(value) if unit == "MiB": return value / (1024 ** 2) elif unit == "GiB": return value / (1024 ** 3) elif unit == "millicores": return value * 1000 return value # Set up cluster access kubeconfig_file = None if cluster and cluster != "default": kubeconfig_file = get_kubeconfig_file(cluster) if not validate_kubeconfig_file(kubeconfig_file): kubeconfig_file = setup_cluster_access(cluster) if not kubeconfig_file: raise FileNotFoundError(f"KUBECONFIG for cluster '{cluster}' does not exist.") pc = prom_connect(kubeconfig=kubeconfig_file) effective_unit = infer_unit(unit, ql) # Query data if data_type == "range": end_dt = parse_datetime(end) start_dt = parse_datetime(start) result = pc.custom_query_range( query=ql, start_time=start_dt, end_time=end_dt, step=step ) else: result = pc.custom_query(query=ql) if len(result) == 0: return { "data": [], "type": data_type, "unit": effective_unit } # Format result recharts_data = [] if data_type == "snapshot": df = MetricSnapshotDataFrame(result) recharts_data = [ { "name": row.get(group_by, "unknown"), "value": transform_value(row["value"], effective_unit) } for _, row in df.iterrows() ] elif data_type == "range": df = MetricRangeDataFrame(result) df["value"]=df["value"].astype(float) # df.index= pandas.to_datetime(df.index, unit="s") df["name"] = df.index columns_to_keep = ["name", "namespace", "pod", "value", group_by] columns_to_keep = list(dict.fromkeys(columns_to_keep)) df = df[[col for col in columns_to_keep if col in df.columns]].copy() for ts, group in df.groupby("name"): if isinstance(ts, pandas.Timestamp): entry = {"name": ts.isoformat()} else: # entry["name"] = ts.isoformat() entry = {"name": ts} for _, row in group.iterrows(): key = row.get(group_by, "unknown") entry[key] = transform_value(row["value"], effective_unit) recharts_data.append(entry) else: raise ValueError("Invalid data_type. Must be 'snapshot' or 'range'.") print({ "data": recharts_data, "type": data_type, "unit": effective_unit }) return { "data": recharts_data, "type": data_type, "unit": effective_unit } except Exception as e: return {"not get the data": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yanmxa/ocm-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server