search-topics-by-tag
Filter and list Kafka topics by a specific tag using the Schema Registry REST API, enabling easy topic discovery and management within Confluent Kafka or Confluent Cloud environments.
Instructions
List all topics in the Kafka cluster with the specified tag.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| baseUrl | No | The base URL of the Schema Registry REST API. | |
| limit | No | The maximum number of topics to return. | |
| offset | No | The offset to start the search from. Used for pagination. | |
| topicTag | No | The tag we wish to search for |
Input Schema (JSON Schema)
{
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"properties": {
"baseUrl": {
"default": "",
"description": "The base URL of the Schema Registry REST API.",
"format": "uri",
"type": "string"
},
"limit": {
"default": 100,
"description": "The maximum number of topics to return.",
"maximum": 500,
"type": "number"
},
"offset": {
"default": 0,
"description": "The offset to start the search from. Used for pagination.",
"type": "number"
},
"topicTag": {
"description": "The tag we wish to search for",
"type": "string"
}
},
"type": "object"
}
Implementation Reference
- SearchTopicsByTagHandler class implementing the tool's core execution logic via the handle() method, which queries the Confluent Schema Registry Catalog API for topics matching the given tag.export class SearchTopicsByTagHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown>, ): Promise<CallToolResult> { const { topicTag, limit, offset, baseUrl } = searchTopicsByTagArguments.parse(toolArguments); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudSchemaRegistryEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudSchemaRegistryRestClient(), ); const { data: response, error } = await pathBasedClient[ "/catalog/v1/search/basic?types=kafka_topic&tag={topicTag}&limit={limit}&offset={offset}" ].GET({ params: { path: { topicTag: topicTag, limit: limit, offset: offset, }, }, }); if (error) { return this.createResponse( `Failed to search for topics by tag: ${JSON.stringify(error)}`, true, ); } return this.createResponse(`${JSON.stringify(response)}`); } getToolConfig(): ToolConfig { return { name: ToolName.SEARCH_TOPICS_BY_TAG, description: "List all topics in the Kafka cluster with the specified tag.", inputSchema: searchTopicsByTagArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["SCHEMA_REGISTRY_API_KEY", "SCHEMA_REGISTRY_API_SECRET"]; } isConfluentCloudOnly(): boolean { return true; } }
- Zod input schema defining parameters for the tool: baseUrl (optional), topicTag (optional), limit (default 100, max 500), offset (default 0).const searchTopicsByTagArguments = z.object({ baseUrl: z .string() .describe("The base URL of the Schema Registry REST API.") .url() .default(() => env.SCHEMA_REGISTRY_ENDPOINT ?? "") .optional(), topicTag: z.string().optional().describe("The tag we wish to search for"), limit: z .number() .max(500) .describe("The maximum number of topics to return.") .default(100), offset: z .number() .describe("The offset to start the search from. Used for pagination.") .default(0), });
- src/confluent/tools/tool-factory.ts:54-54 (registration)Maps ToolName.SEARCH_TOPICS_BY_TAG to a new instance of SearchTopicsByTagHandler in the ToolFactory's handlers Map, enabling tool lookup and configuration retrieval.[ToolName.SEARCH_TOPICS_BY_TAG, new SearchTopicsByTagHandler()],
- src/confluent/tools/tool-name.ts:15-15 (registration)Defines the exact string name 'search-topics-by-tag' for the tool in the ToolName enum, used for registration and identification.SEARCH_TOPICS_BY_TAG = "search-topics-by-tag",