Skip to main content
Glama
yanmxa

Multi-Cluster MCP Server

by yanmxa

prometheus

Query and visualize Prometheus metrics from Kubernetes clusters using PromQL. Supports instant and time-series data, grouping by labels, and customized units for Recharts integration.

Instructions

Query Prometheus metrics from a specific cluster and format the results for Recharts visualization.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
clusterNoThe target cluster name. Defaults to the hub cluster.
data_typeNoType of query: 'snapshot' for instant or 'range' for time-series.snapshot
endNo(Only for data_type='range') End time in ISO 8601 format. Defaults to now if not provided.
group_byNoLabel to group results by, such as 'pod' or 'namespace'.pod
qlYesThe PromQL query string to run against the Prometheus server.
startNo(Only for data_type='range') Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'.
stepNo(Only for data_type='range') Query resolution step (e.g., '30s', '5m', '1h').5m
unitNoThe desired output unit: 'auto', 'bytes', 'MiB', 'GiB', 'cores', or 'millicores'.auto

Implementation Reference

  • Python implementation of the 'prometheus' MCP tool handler, including parameter schema via Pydantic Annotated Fields and self-registration via @mcp.tool decorator. Uses prometheus_api_client to query and formats for Recharts.
    @mcp.tool(description="Query Prometheus metrics from a specific cluster and format the results for Recharts visualization.")
    def prometheus(
        ql: Annotated[str, Field(description="The PromQL query string to run against the Prometheus server.")],
        data_type: Annotated[str, Field(description="Type of query: 'snapshot' for instant or 'range' for time-series.")] = "snapshot",
        group_by: Annotated[str, Field(description="Label to group results by, such as 'pod' or 'namespace'.")] = "pod",
        unit: Annotated[str, Field(description="The desired output unit: 'auto', 'bytes', 'MiB', 'GiB', 'cores', or 'millicores'.")] = "auto",
        cluster: Annotated[Optional[str], Field(description="The target cluster name. Defaults to the hub cluster.")] = None,
        start: Annotated[
            Optional[str],
            Field(description="(Only for data_type='range') Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'.")
        ] = None,
        end: Annotated[
            Optional[str],
            Field(description="(Only for data_type='range') End time in ISO 8601 format. Defaults to now if not provided.")
        ] = None,
        step: Annotated[
            Optional[str],
            Field(description="(Only for data_type='range') Query resolution step (e.g., '30s', '5m', '1h').")
        ] = "5m",
    ) -> Annotated[dict, Field(description="Formatted result including Recharts-compatible data or error message.")]:
        try:
            def infer_unit(unit: str, query: str) -> str:
                if unit != "auto":
                    return unit
                q = query.lower()
                if "memory" in q or "bytes" in q:
                    return "GiB"
                elif "cpu" in q:
                    return "cores"
                return "raw"
    
            def transform_value(value: float, unit: str) -> float:
                value = float(value)
                if unit == "MiB":
                    return value / (1024 ** 2)
                elif unit == "GiB":
                    return value / (1024 ** 3)
                elif unit == "millicores":
                    return value * 1000
                return value
    
            # Set up cluster access
            kubeconfig_file = None
            if cluster and cluster != "default":
                kubeconfig_file = get_kubeconfig_file(cluster)
                if not validate_kubeconfig_file(kubeconfig_file):
                    kubeconfig_file = setup_cluster_access(cluster)
                    if not kubeconfig_file:
                        raise FileNotFoundError(f"KUBECONFIG for cluster '{cluster}' does not exist.")
    
            pc = prom_connect(kubeconfig=kubeconfig_file)
            effective_unit = infer_unit(unit, ql)
    
            # Query data
            if data_type == "range":
                end_dt = parse_datetime(end) 
                start_dt = parse_datetime(start) 
                result = pc.custom_query_range(
                    query=ql,
                    start_time=start_dt,
                    end_time=end_dt,
                    step=step
                )
            else:
                result = pc.custom_query(query=ql)
                
            if len(result) == 0:
                return {
                  "data": [],
                    "type": data_type,
                    "unit": effective_unit
                }
    
            # Format result
            recharts_data = []
            if data_type == "snapshot":
                df = MetricSnapshotDataFrame(result)
                recharts_data = [
                    {
                        "name": row.get(group_by, "unknown"),
                        "value": transform_value(row["value"], effective_unit)
                    }
                    for _, row in df.iterrows()
                ]
            elif data_type == "range":
                df = MetricRangeDataFrame(result)
                df["value"]=df["value"].astype(float)
                # df.index= pandas.to_datetime(df.index, unit="s")
                df["name"] = df.index
                
                columns_to_keep = ["name", "namespace", "pod", "value", group_by]
                columns_to_keep = list(dict.fromkeys(columns_to_keep))
                df = df[[col for col in columns_to_keep if col in df.columns]].copy()
                
    
                for ts, group in df.groupby("name"):
                  if isinstance(ts, pandas.Timestamp):
                    entry = {"name": ts.isoformat()}
                  else:
                    # entry["name"] = ts.isoformat()
                    entry = {"name": ts}
                  for _, row in group.iterrows():
                      key = row.get(group_by, "unknown")
                      entry[key] = transform_value(row["value"], effective_unit)
                  recharts_data.append(entry)
            else:
                raise ValueError("Invalid data_type. Must be 'snapshot' or 'range'.")
            print({
                "data": recharts_data,
                "type": data_type,
                "unit": effective_unit
            })
            
            return {
                "data": recharts_data,
                "type": data_type,
                "unit": effective_unit
            }
    
        except Exception as e:
            return {"not get the data": str(e)}
  • TypeScript implementation of the 'prometheus' tool handler function. Connects to Thanos-Querier via discovered route, executes PromQL query using axios, transforms units, and returns formatted data.
    export async function prometheus({
      ql,
      data_type = "snapshot",
      group_by = "pod",
      unit = "auto",
      cluster = "default",
      start,
      end,
      step = "5m",
    }: {
      ql: string;
      data_type: "snapshot" | "range";
      group_by: string;
      unit: "auto" | "bytes" | "MiB" | "GiB" | "cores" | "millicores";
      cluster?: string;
      start?: string;
      end?: string;
      step?: string;
    }): Promise<CallToolResult> {
    
      let responseData: any[] = [];
    
      try {
        const { url, token } = await getPrometheusURL(cluster);
    
        const headers = { Authorization: token };
        const effectiveUnit = inferUnit(unit, ql);
    
        const httpsAgent = new https.Agent({ rejectUnauthorized: false });
    
        if (data_type === "range") {
          const response = await axios.default.get(`${url}/api/v1/query_range`, {
            headers,
            params: {
              query: ql,
              start,
              end,
              step,
            },
            httpsAgent,
            proxy: false,
          });
          responseData = response.data.data.result.map((series: any) => ({
            metric: series.metric,
            values: series.values.map(([timestamp, rawValue]: [number, string]) => [
              dayjs.unix(timestamp).toISOString(),
              Number(rawValue) / (1024 * 1024), // bytes → MiB
            ]),
          }));
        } else {
    
          const response = await axios.default.get(`${url}/api/v1/query`, {
            headers,
            params: { query: ql },
            httpsAgent,
            proxy: false,
          });
    
          responseData = response.data.data.result.map(
            (entry: { metric: { [x: string]: any; }; value: (string | number)[]; }) => (
              {
                [group_by]: entry.metric[group_by] || "value",
                value: transformValue(entry.value[1], effectiveUnit),
              }
            ));
        }
    
        // console.warn(responseData)
        if (responseData.length === 0) {
          return {
            content: [{
              type: "text",
              text: JSON.stringify({
                data: [],
                type: data_type,
                unit: effectiveUnit,
              }),
            }],
          };
          // return { data: [], type: data_type, unit: effectiveUnit };
        }
    
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(
                {
                  data: responseData,
                  type: data_type,
                  unit: effectiveUnit,
                }),
            }
          ],
        };
      } catch (err: any) {
        console.error(responseData)
        console.error(err)
        return {
          content: [{
            type: "text",
            text: `Failed to query Prometheus: ${err.message || String(err)}`,
          }],
        };
      }
    }
  • src/index.ts:44-48 (registration)
    Explicit registration of the 'prometheus' tool on the TypeScript MCP server instance, linking description, Zod schema, and handler function.
    server.tool(
      "prometheus",
      prometheusDesc,
      prometheusArgs,
      async (args, extra) => prometheus(args) // ensure connectCluster matches (args, extra) => ...
  • Zod schema (prometheusArgs) defining input parameters for the 'prometheus' tool in TypeScript, matching the Python version.
    export const prometheusArgs = {
      ql: z.string().describe(
        "The PromQL query string to run against the Prometheus server."
      ),
    
      data_type: z
        .enum(["snapshot", "range"])
        .describe("Type of query: 'snapshot' (instant) or 'range' (time-series).")
        .default("snapshot"),
    
      group_by: z
        .string()
        .describe(
          "Label to group results by, such as 'pod' or 'namespace'. If not specified, behavior depends on the query (e.g., 'sum', 'avg')."
        )
        .default("pod"),
    
      unit: z
        .enum(["auto", "bytes", "MiB", "GiB", "cores", "millicores"])
        .describe(
          "Desired output unit. Use 'auto' to infer from the query content (e.g., memory → MiB, CPU → cores)."
        )
        .default("auto"),
    
      cluster: z
        .string()
        .describe(
          "Target cluster name in a multi-cluster environment. Defaults to the hub cluster if not provided."
        )
        .default("default")
        .optional(),
    
      start: z
        .string()
        .describe(
          "(range only) Start time in ISO 8601 format, e.g., '2025-06-06T00:00:00Z'."
        )
        .optional(),
    
      end: z
        .string()
        .describe(
          "(range only) End time in ISO 8601 format. Defaults to current time if not provided."
        )
        .optional(),
    
      step: z
        .string()
        .describe(
          "(range only) Resolution step (e.g., '30s', '5m', '1h'). Choose appropriately to keep the sample count under 200."
        )
        .optional(),
    };
  • Helper function to resolve Prometheus (Thanos-Querier) URL and auth token from Kubernetes route, with caching.
    export async function getPrometheusURL(cluster?: string): Promise<{ url: string; token: string }> {
      const cacheKey = cluster || "default";
    
      if (prometheusCache.has(cacheKey)) {
        return prometheusCache.get(cacheKey)!;
      }
    
      const kubeConfigFile = await getKubeconfigFile(cluster)
    
      const kc = new KubeConfig();
      if (kubeConfigFile) {
        kc.loadFromFile(kubeConfigFile);
      } else {
        kc.loadFromDefault();
      }
    
      const customApi = kc.makeApiClient(CustomObjectsApi);
      const res = await customApi.getNamespacedCustomObject({
        group: "route.openshift.io",
        version: "v1",
        namespace: "openshift-monitoring",
        plural: "routes",
        name: "thanos-querier"
      });
    
      const host = (res as any)?.spec?.host;
      if (!host) {
        throw new Error(`Failed to retrieve Prometheus route from cluster ${cluster}.`);
      }
    
      const user = kc.getCurrentUser();
      const token = user?.token;
      if (!token) {
        throw new Error(`No token found in KUBECONFIG for cluster ${cluster}.`);
      }
    
      const result = {
        url: `https://${host}`,
        token: `Bearer ${token}`,
      };
    
      prometheusCache.set(cacheKey, result);
      return result;
    }
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yanmxa/multicluster-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server