add-tags-to-topic
Assign existing tags to Kafka topics in Confluent Cloud, enabling organized and efficient topic management. Specify tag assignments directly through structured input.
Instructions
Assign existing tags to Kafka topics in Confluent Cloud.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| baseUrl | No | The base URL of the Schema Registry REST API. | |
| tagAssignments | Yes | Array of tag assignments to create |
Input Schema (JSON Schema)
{
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"properties": {
"baseUrl": {
"default": "",
"description": "The base URL of the Schema Registry REST API.",
"format": "uri",
"type": "string"
},
"tagAssignments": {
"description": "Array of tag assignments to create",
"items": {
"additionalProperties": false,
"properties": {
"entityName": {
"description": "Qualified name of the entity. If not provided, you can obtain it from using the search-topics-by-name tool. example: \"lsrc-g2p81:lkc-xq8k7g:my-flights\"",
"type": "string"
},
"entityType": {
"default": "kafka_topic",
"type": "string"
},
"typeName": {
"description": "Name of the tag to assign",
"type": "string"
}
},
"required": [
"entityName",
"typeName"
],
"type": "object"
},
"minItems": 1,
"type": "array"
}
},
"required": [
"tagAssignments"
],
"type": "object"
}
Implementation Reference
- AddTagToTopicHandler class extending BaseToolHandler, containing the handle() method that executes the tool logic by assigning tags to topics using the Schema Registry API.export class AddTagToTopicHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown>, ): Promise<CallToolResult> { const { tagAssignments, baseUrl } = addTagToTopicArguments.parse(toolArguments); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudSchemaRegistryEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudSchemaRegistryRestClient(), ); const { data: response, error } = await pathBasedClient[ "/catalog/v1/entity/tags" ].POST({ body: tagAssignments, }); if (error) { return this.createResponse( `Failed to assign tag: ${JSON.stringify(error)}`, true, ); } return this.createResponse( `Successfully assigned tag: ${JSON.stringify(response)}`, ); } getToolConfig(): ToolConfig { return { name: ToolName.ADD_TAGS_TO_TOPIC, description: "Assign existing tags to Kafka topics in Confluent Cloud.", inputSchema: addTagToTopicArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["SCHEMA_REGISTRY_API_KEY", "SCHEMA_REGISTRY_API_SECRET"]; } isConfluentCloudOnly(): boolean { return true; } }
- Zod input schema definition for the add-tags-to-topic tool arguments.const addTagToTopicArguments = z.object({ baseUrl: z .string() .describe("The base URL of the Schema Registry REST API.") .url() .default(() => env.SCHEMA_REGISTRY_ENDPOINT ?? "") .optional(), tagAssignments: z .array( z.object({ entityType: z.string().default("kafka_topic"), entityName: z .string() .describe( `Qualified name of the entity. If not provided, you can obtain it from using the ${ToolName.SEARCH_TOPICS_BY_NAME} tool. example: "lsrc-g2p81:lkc-xq8k7g:my-flights"`, ), typeName: z.string().describe("Name of the tag to assign"), }), ) .nonempty() .describe("Array of tag assignments to create"), });
- src/confluent/tools/tool-factory.ts:58-58 (registration)Registration of the AddTagToTopicHandler in the ToolFactory's static handlers Map using the tool name.[ToolName.ADD_TAGS_TO_TOPIC, new AddTagToTopicHandler()],
- Enum definition for the tool name constant ADD_TAGS_TO_TOPIC.ADD_TAGS_TO_TOPIC = "add-tags-to-topic",