Skip to main content
Glama

mcp-confluent

Official
by confluentinc
read-flink-statement-handler.ts•4.55 kB
import { ClientManager } from "@src/confluent/client-manager.js"; import { getEnsuredParam } from "@src/confluent/helpers.js"; import { CallToolResult } from "@src/confluent/schema.js"; import { BaseToolHandler, ToolConfig, } from "@src/confluent/tools/base-tools.js"; import { ToolName } from "@src/confluent/tools/tool-name.js"; import { EnvVar } from "@src/env-schema.js"; import env from "@src/env.js"; import { wrapAsPathBasedClient } from "openapi-fetch"; import { z } from "zod"; const readFlinkStatementArguments = z.object({ baseUrl: z .string() .describe("The base URL of the Flink REST API.") .url() .default(() => env.FLINK_REST_ENDPOINT ?? "") .optional(), organizationId: z .string() .trim() .optional() .describe("The unique identifier for the organization."), environmentId: z .string() .trim() .optional() .describe("The unique identifier for the environment."), statementName: z .string() .regex( new RegExp( "[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*", ), ) .nonempty() .max(100) .describe( "The user provided name of the resource, unique within this environment.", ), timeoutInMilliseconds: z .number() .optional() .default(60000) .describe( "The function implements pagination. It will continue to fetch results using the next page token until either there are no more results or the timeout is reached. Tables backed by kafka topics can be thought of as never-ending streams as data could be continuously produced in near real-time. Therefore, if you wish to sample values from a stream, you may want to set a timeout. If you are reading a statement after creating it, you may need to retry a couple times to ensure that the statement is ready and receiving data.", ), }); export class ReadFlinkStatementHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown> | undefined, ): Promise<CallToolResult> { const { timeoutInMilliseconds, statementName, environmentId, organizationId, baseUrl, } = readFlinkStatementArguments.parse(toolArguments); const organization_id = getEnsuredParam( "FLINK_ORG_ID", "Organization ID is required", organizationId, ); const environment_id = getEnsuredParam( "FLINK_ENV_ID", "Environment ID is required", environmentId, ); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudFlinkEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudFlinkRestClient(), ); let allResults: unknown[] = []; let nextToken: string | undefined = undefined; const timeout = timeoutInMilliseconds === -1 || timeoutInMilliseconds === undefined ? undefined : Date.now() + timeoutInMilliseconds; /** * A timeout period has elapsed if a timeout is defined and the current time has exceeded it, * `false` otherwise. */ const hasTimedOut = () => timeout !== undefined && Date.now() >= timeout; do { const { data: response, error } = await pathBasedClient[ "/sql/v1/organizations/{organization_id}/environments/{environment_id}/statements/{name}/results" ].GET({ params: { path: { organization_id: organization_id, environment_id: environment_id, name: statementName, }, // only include the page token if it's defined ...(nextToken ? { query: { page_token: nextToken } } : {}), }, }); if (error) { return this.createResponse( `Failed to read Flink SQL statement: ${JSON.stringify(error)}`, true, ); } allResults = allResults.concat(response?.results.data || []); nextToken = response?.metadata.next?.split("page_token=")[1]; } while (nextToken && !hasTimedOut()); return this.createResponse( `Flink SQL Statement Results: ${JSON.stringify(allResults)}`, ); } getToolConfig(): ToolConfig { return { name: ToolName.READ_FLINK_STATEMENT, description: "Make a request to read a statement and its results", inputSchema: readFlinkStatementArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["FLINK_API_KEY", "FLINK_API_SECRET"]; } isConfluentCloudOnly(): boolean { return true; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server