Skip to main content
Glama

mcp-confluent

Official
by confluentinc
get-topic-config.ts•3.29 kB
import { ClientManager } from "@src/confluent/client-manager.js"; import { getEnsuredParam } from "@src/confluent/helpers.js"; import { CallToolResult } from "@src/confluent/schema.js"; import { BaseToolHandler, ToolConfig, } from "@src/confluent/tools/base-tools.js"; import { ToolName } from "@src/confluent/tools/tool-name.js"; import { EnvVar } from "@src/env-schema.js"; import env from "@src/env.js"; import { wrapAsPathBasedClient } from "openapi-fetch"; import { z } from "zod"; const getTopicConfigArguments = z.object({ baseUrl: z .string() .describe("The base URL of the Confluent Cloud Kafka REST API.") .url() .default(() => env.KAFKA_REST_ENDPOINT ?? "") .optional(), clusterId: z .string() .optional() .describe("The unique identifier for the Kafka cluster."), topicName: z .string() .describe("Name of the topic to get configuration for") .nonempty(), }); /** * Handler for retrieving Kafka topic configurations through Confluent Cloud REST API. * This implementation uses Confluent's REST API endpoints to fetch topic configuration * details that aren't directly accessible through the native Kafka admin client API. */ export class GetTopicConfigHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown>, ): Promise<CallToolResult> { const { clusterId, topicName, baseUrl } = getTopicConfigArguments.parse(toolArguments); const kafka_cluster_id = getEnsuredParam( "KAFKA_CLUSTER_ID", "Kafka Cluster ID is required", clusterId, ); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudKafkaRestEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudKafkaRestClient(), ); // First, get topic details const { data: topicData, error: topicError } = await pathBasedClient[ `/kafka/v3/clusters/${kafka_cluster_id}/topics/${topicName}` ].GET(); if (topicError) { return this.createResponse( `Failed to retrieve topic details: ${JSON.stringify(topicError)}`, true, ); } // Then, get topic configurations const { data: configData, error: configError } = await pathBasedClient[ `/kafka/v3/clusters/${kafka_cluster_id}/topics/${topicName}/configs` ].GET(); if (configError) { return this.createResponse( `Failed to retrieve topic configuration: ${JSON.stringify(configError)}`, true, ); } // Combine topic details and configuration into a single response const response = { topicDetails: topicData, topicConfig: configData, }; return this.createResponse( `Topic configuration for '${topicName}':\n${JSON.stringify(response, null, 2)}`, ); } getToolConfig(): ToolConfig { return { name: ToolName.GET_TOPIC_CONFIG, description: "Retrieve configuration details for a specific Kafka topic.", inputSchema: getTopicConfigArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["KAFKA_API_KEY", "KAFKA_API_SECRET", "BOOTSTRAP_SERVERS"]; } isConfluentCloudOnly(): boolean { return true; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server