Skip to main content
Glama

mcp-confluent

Official
by confluentinc

read-flink-statement

Retrieve and analyze Flink statements and their results by querying the Flink REST API. Specify the statement name, organization, and environment to access real-time data streams or sampled results.

Instructions

Make a request to read a statement and its results

Input Schema

NameRequiredDescriptionDefault
baseUrlNoThe base URL of the Flink REST API.
environmentIdNoThe unique identifier for the environment.
organizationIdNoThe unique identifier for the organization.
statementNameYesThe user provided name of the resource, unique within this environment.
timeoutInMillisecondsNoThe function implements pagination. It will continue to fetch results using the next page token until either there are no more results or the timeout is reached. Tables backed by kafka topics can be thought of as never-ending streams as data could be continuously produced in near real-time. Therefore, if you wish to sample values from a stream, you may want to set a timeout. If you are reading a statement after creating it, you may need to retry a couple times to ensure that the statement is ready and receiving data.

Input Schema (JSON Schema)

{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "properties": { "baseUrl": { "default": "", "description": "The base URL of the Flink REST API.", "format": "uri", "type": "string" }, "environmentId": { "description": "The unique identifier for the environment.", "type": "string" }, "organizationId": { "description": "The unique identifier for the organization.", "type": "string" }, "statementName": { "description": "The user provided name of the resource, unique within this environment.", "maxLength": 100, "minLength": 1, "pattern": "[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*", "type": "string" }, "timeoutInMilliseconds": { "default": 60000, "description": "The function implements pagination. It will continue to fetch results using the next page token until either there are no more results or the timeout is reached. Tables backed by kafka topics can be thought of as never-ending streams as data could be continuously produced in near real-time. Therefore, if you wish to sample values from a stream, you may want to set a timeout. If you are reading a statement after creating it, you may need to retry a couple times to ensure that the statement is ready and receiving data.", "type": "number" } }, "required": [ "statementName" ], "type": "object" }

Implementation Reference

  • ReadFlinkStatementHandler class with handle method implementing pagination-aware reading of Flink SQL statement results using Confluent Cloud Flink REST API.
    export class ReadFlinkStatementHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown> | undefined, ): Promise<CallToolResult> { const { timeoutInMilliseconds, statementName, environmentId, organizationId, baseUrl, } = readFlinkStatementArguments.parse(toolArguments); const organization_id = getEnsuredParam( "FLINK_ORG_ID", "Organization ID is required", organizationId, ); const environment_id = getEnsuredParam( "FLINK_ENV_ID", "Environment ID is required", environmentId, ); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudFlinkEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudFlinkRestClient(), ); let allResults: unknown[] = []; let nextToken: string | undefined = undefined; const timeout = timeoutInMilliseconds === -1 || timeoutInMilliseconds === undefined ? undefined : Date.now() + timeoutInMilliseconds; /** * A timeout period has elapsed if a timeout is defined and the current time has exceeded it, * `false` otherwise. */ const hasTimedOut = () => timeout !== undefined && Date.now() >= timeout; do { const { data: response, error } = await pathBasedClient[ "/sql/v1/organizations/{organization_id}/environments/{environment_id}/statements/{name}/results" ].GET({ params: { path: { organization_id: organization_id, environment_id: environment_id, name: statementName, }, // only include the page token if it's defined ...(nextToken ? { query: { page_token: nextToken } } : {}), }, }); if (error) { return this.createResponse( `Failed to read Flink SQL statement: ${JSON.stringify(error)}`, true, ); } allResults = allResults.concat(response?.results.data || []); nextToken = response?.metadata.next?.split("page_token=")[1]; } while (nextToken && !hasTimedOut()); return this.createResponse( `Flink SQL Statement Results: ${JSON.stringify(allResults)}`, ); } getToolConfig(): ToolConfig { return { name: ToolName.READ_FLINK_STATEMENT, description: "Make a request to read a statement and its results", inputSchema: readFlinkStatementArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["FLINK_API_KEY", "FLINK_API_SECRET"]; } isConfluentCloudOnly(): boolean { return true; } }
  • Zod input schema defining parameters for the read-flink-statement tool: baseUrl, organizationId, environmentId, statementName, timeoutInMilliseconds.
    const readFlinkStatementArguments = z.object({ baseUrl: z .string() .describe("The base URL of the Flink REST API.") .url() .default(() => env.FLINK_REST_ENDPOINT ?? "") .optional(), organizationId: z .string() .trim() .optional() .describe("The unique identifier for the organization."), environmentId: z .string() .trim() .optional() .describe("The unique identifier for the environment."), statementName: z .string() .regex( new RegExp( "[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*", ), ) .nonempty() .max(100) .describe( "The user provided name of the resource, unique within this environment.", ), timeoutInMilliseconds: z .number() .optional() .default(60000) .describe( "The function implements pagination. It will continue to fetch results using the next page token until either there are no more results or the timeout is reached. Tables backed by kafka topics can be thought of as never-ending streams as data could be continuously produced in near real-time. Therefore, if you wish to sample values from a stream, you may want to set a timeout. If you are reading a statement after creating it, you may need to retry a couple times to ensure that the statement is ready and receiving data.", ), });
  • Registration of the ReadFlinkStatementHandler in the ToolFactory's static handlers Map using the tool name.
    [ToolName.READ_FLINK_STATEMENT, new ReadFlinkStatementHandler()],
  • Enum definition providing the exact string name 'read-flink-statement' for the tool.
    READ_FLINK_STATEMENT = "read-flink-statement",

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server