Skip to main content
Glama

mcp-confluent

Official
by confluentinc

search-topics-by-tag

Filter and list Kafka topics by a specific tag using the Schema Registry REST API, enabling easy topic discovery and management within Confluent Kafka or Confluent Cloud environments.

Instructions

List all topics in the Kafka cluster with the specified tag.

Input Schema

NameRequiredDescriptionDefault
baseUrlNoThe base URL of the Schema Registry REST API.
limitNoThe maximum number of topics to return.
offsetNoThe offset to start the search from. Used for pagination.
topicTagNoThe tag we wish to search for

Input Schema (JSON Schema)

{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "properties": { "baseUrl": { "default": "", "description": "The base URL of the Schema Registry REST API.", "format": "uri", "type": "string" }, "limit": { "default": 100, "description": "The maximum number of topics to return.", "maximum": 500, "type": "number" }, "offset": { "default": 0, "description": "The offset to start the search from. Used for pagination.", "type": "number" }, "topicTag": { "description": "The tag we wish to search for", "type": "string" } }, "type": "object" }

Implementation Reference

  • SearchTopicsByTagHandler class implementing the tool's core execution logic via the handle() method, which queries the Confluent Schema Registry Catalog API for topics matching the given tag.
    export class SearchTopicsByTagHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown>, ): Promise<CallToolResult> { const { topicTag, limit, offset, baseUrl } = searchTopicsByTagArguments.parse(toolArguments); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudSchemaRegistryEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudSchemaRegistryRestClient(), ); const { data: response, error } = await pathBasedClient[ "/catalog/v1/search/basic?types=kafka_topic&tag={topicTag}&limit={limit}&offset={offset}" ].GET({ params: { path: { topicTag: topicTag, limit: limit, offset: offset, }, }, }); if (error) { return this.createResponse( `Failed to search for topics by tag: ${JSON.stringify(error)}`, true, ); } return this.createResponse(`${JSON.stringify(response)}`); } getToolConfig(): ToolConfig { return { name: ToolName.SEARCH_TOPICS_BY_TAG, description: "List all topics in the Kafka cluster with the specified tag.", inputSchema: searchTopicsByTagArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["SCHEMA_REGISTRY_API_KEY", "SCHEMA_REGISTRY_API_SECRET"]; } isConfluentCloudOnly(): boolean { return true; } }
  • Zod input schema defining parameters for the tool: baseUrl (optional), topicTag (optional), limit (default 100, max 500), offset (default 0).
    const searchTopicsByTagArguments = z.object({ baseUrl: z .string() .describe("The base URL of the Schema Registry REST API.") .url() .default(() => env.SCHEMA_REGISTRY_ENDPOINT ?? "") .optional(), topicTag: z.string().optional().describe("The tag we wish to search for"), limit: z .number() .max(500) .describe("The maximum number of topics to return.") .default(100), offset: z .number() .describe("The offset to start the search from. Used for pagination.") .default(0), });
  • Maps ToolName.SEARCH_TOPICS_BY_TAG to a new instance of SearchTopicsByTagHandler in the ToolFactory's handlers Map, enabling tool lookup and configuration retrieval.
    [ToolName.SEARCH_TOPICS_BY_TAG, new SearchTopicsByTagHandler()],
  • Defines the exact string name 'search-topics-by-tag' for the tool in the ToolName enum, used for registration and identification.
    SEARCH_TOPICS_BY_TAG = "search-topics-by-tag",

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server