Skip to main content
Glama

mcp-confluent

Official
by confluentinc
update-tableflow-topic-handler.ts•4.65 kB
import { ClientManager } from "@src/confluent/client-manager.js"; import { CallToolResult } from "@src/confluent/schema.js"; import { BaseToolHandler, ToolConfig, } from "@src/confluent/tools/base-tools.js"; import { ToolName } from "@src/confluent/tools/tool-name.js"; import { EnvVar } from "@src/env-schema.js"; import env from "@src/env.js"; import { wrapAsPathBasedClient } from "openapi-fetch"; import { z } from "zod"; const updateTableflowTopicArguments = z.object({ baseUrl: z .string() .trim() .describe("The base url of the Tableflow REST API.") .url() .default(() => env.CONFLUENT_CLOUD_REST_ENDPOINT ?? "") .optional(), display_name: z .string() .describe("The name of the Kafka topic for which Tableflow is enabled."), tableflowTopicConfig: z.object({ // Required fields storage: z.object({ kind: z .enum(["ByobAws", "Managed"]) .default("ByobAws") .describe("The storage type either 'Managed' or 'ByobAws'."), bucket_name: z.string().describe("The bucket name."), provider_integration_id: z .string() .describe("The provider integration id."), }), environment: z.object({ id: z .string() .describe( "The unique identifier for the environment this resource belongs to.", ), }), kafka_cluster: z.object({ id: z.string().describe("ID of the referred resource"), environment: z .string() .describe("Environment of the referred resource, if env-scoped"), }), // Optional fields suspended: z .boolean() .optional() .default(false) .describe( "Indicates whether Tableflow should be suspended. The API allows setting it only to false i.e resume the Tableflow.", ), config: z.object({ retention_ms: z .string() .default("6048000000") // equivalent to 7 days .describe( "The maximum age, in milliseconds, of snapshots (for Iceberg) or versions(for Delta) to retain in the table for the Tableflow-enabled topic.", ), record_failure_strategy: z .string() .default("SUSPENDED") .describe( "The strategy to handle record failures in the Tableflow enabled topic during materialization.", ), }), table_formats: z .array(z.string()) .default(["ICEBERG"]) .describe( "The supported table formats for the Tableflow-enabled topic e.g ICEBERG, DELTA", ), }), }); export class UpdateTableFlowTopicHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown> | undefined, ): Promise<CallToolResult> { const { baseUrl, display_name, tableflowTopicConfig } = updateTableflowTopicArguments.parse(toolArguments); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudTableflowRestEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudTableflowRestClient(), ); const { environment, ...restOfTableflowConfig } = tableflowTopicConfig; const { data: response, error } = await pathBasedClient[ `/tableflow/v1/tableflow-topics/${display_name}` ].PATCH({ params: { path: { display_name: display_name, }, }, body: { spec: { ...restOfTableflowConfig, environment: { id: environment.id }, // Only include id, as the general environment as requires readonly and resource_name // eslint-disable-next-line @typescript-eslint/no-explicit-any } as any, // Due to how OpenAPI specification is structured and how generators interpret it, we have to treat it as any, as // The most likely culprit for mismatch is the reuse of a single base schema for both input (requestBody) and output (responses) evironment. }, }); if (error) { return this.createResponse( `Failed to update Tableflow topic for ${display_name}: ${JSON.stringify(error)}`, true, ); } return this.createResponse( `Tableflow Topic ${display_name} updated: ${JSON.stringify(response)}`, ); } getToolConfig(): ToolConfig { return { name: ToolName.UPDATE_TABLEFLOW_TOPIC, description: `Make a request to update a tableflow topic.`, inputSchema: updateTableflowTopicArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["TABLEFLOW_API_KEY", "TABLEFLOW_API_SECRET"]; } isConfluentCloudOnly(): boolean { return true; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server