Skip to main content
Glama
confluentinc

mcp-confluent

Official
by confluentinc

read-flink-statement

Retrieve and analyze Flink statements and their results by querying the Flink REST API. Specify the statement name, organization, and environment to access real-time data streams or sampled results.

Instructions

Make a request to read a statement and its results

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
baseUrlNoThe base URL of the Flink REST API.
environmentIdNoThe unique identifier for the environment.
organizationIdNoThe unique identifier for the organization.
statementNameYesThe user provided name of the resource, unique within this environment.
timeoutInMillisecondsNoThe function implements pagination. It will continue to fetch results using the next page token until either there are no more results or the timeout is reached. Tables backed by kafka topics can be thought of as never-ending streams as data could be continuously produced in near real-time. Therefore, if you wish to sample values from a stream, you may want to set a timeout. If you are reading a statement after creating it, you may need to retry a couple times to ensure that the statement is ready and receiving data.

Implementation Reference

  • ReadFlinkStatementHandler class with handle method implementing pagination-aware reading of Flink SQL statement results using Confluent Cloud Flink REST API.
    export class ReadFlinkStatementHandler extends BaseToolHandler {
      async handle(
        clientManager: ClientManager,
        toolArguments: Record<string, unknown> | undefined,
      ): Promise<CallToolResult> {
        const {
          timeoutInMilliseconds,
          statementName,
          environmentId,
          organizationId,
          baseUrl,
        } = readFlinkStatementArguments.parse(toolArguments);
        const organization_id = getEnsuredParam(
          "FLINK_ORG_ID",
          "Organization ID is required",
          organizationId,
        );
        const environment_id = getEnsuredParam(
          "FLINK_ENV_ID",
          "Environment ID is required",
          environmentId,
        );
    
        if (baseUrl !== undefined && baseUrl !== "") {
          clientManager.setConfluentCloudFlinkEndpoint(baseUrl);
        }
        const pathBasedClient = wrapAsPathBasedClient(
          clientManager.getConfluentCloudFlinkRestClient(),
        );
        let allResults: unknown[] = [];
        let nextToken: string | undefined = undefined;
        const timeout =
          timeoutInMilliseconds === -1 || timeoutInMilliseconds === undefined
            ? undefined
            : Date.now() + timeoutInMilliseconds;
    
        /**
         * A timeout period has elapsed if a timeout is defined and the current time has exceeded it,
         * `false` otherwise.
         */
        const hasTimedOut = () => timeout !== undefined && Date.now() >= timeout;
    
        do {
          const { data: response, error } = await pathBasedClient[
            "/sql/v1/organizations/{organization_id}/environments/{environment_id}/statements/{name}/results"
          ].GET({
            params: {
              path: {
                organization_id: organization_id,
                environment_id: environment_id,
                name: statementName,
              },
              // only include the page token if it's defined
              ...(nextToken ? { query: { page_token: nextToken } } : {}),
            },
          });
    
          if (error) {
            return this.createResponse(
              `Failed to read Flink SQL statement: ${JSON.stringify(error)}`,
              true,
            );
          }
    
          allResults = allResults.concat(response?.results.data || []);
          nextToken = response?.metadata.next?.split("page_token=")[1];
        } while (nextToken && !hasTimedOut());
    
        return this.createResponse(
          `Flink SQL Statement Results: ${JSON.stringify(allResults)}`,
        );
      }
      getToolConfig(): ToolConfig {
        return {
          name: ToolName.READ_FLINK_STATEMENT,
          description: "Make a request to read a statement and its results",
          inputSchema: readFlinkStatementArguments.shape,
        };
      }
    
      getRequiredEnvVars(): EnvVar[] {
        return ["FLINK_API_KEY", "FLINK_API_SECRET"];
      }
    
      isConfluentCloudOnly(): boolean {
        return true;
      }
    }
  • Zod input schema defining parameters for the read-flink-statement tool: baseUrl, organizationId, environmentId, statementName, timeoutInMilliseconds.
    const readFlinkStatementArguments = z.object({
      baseUrl: z
        .string()
        .describe("The base URL of the Flink REST API.")
        .url()
        .default(() => env.FLINK_REST_ENDPOINT ?? "")
        .optional(),
      organizationId: z
        .string()
        .trim()
        .optional()
        .describe("The unique identifier for the organization."),
      environmentId: z
        .string()
        .trim()
        .optional()
        .describe("The unique identifier for the environment."),
      statementName: z
        .string()
        .regex(
          new RegExp(
            "[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
          ),
        )
        .nonempty()
        .max(100)
        .describe(
          "The user provided name of the resource, unique within this environment.",
        ),
      timeoutInMilliseconds: z
        .number()
        .optional()
        .default(60000)
        .describe(
          "The function implements pagination. It will continue to fetch results using the next page token until either there are no more results or the timeout is reached. Tables backed by kafka topics can be thought of as never-ending streams as data could be continuously produced in near real-time. Therefore, if you wish to sample values from a stream, you may want to set a timeout. If you are reading a statement after creating it, you may need to retry a couple times to ensure that the statement is ready and receiving data.",
        ),
    });
  • Registration of the ReadFlinkStatementHandler in the ToolFactory's static handlers Map using the tool name.
    [ToolName.READ_FLINK_STATEMENT, new ReadFlinkStatementHandler()],
  • Enum definition providing the exact string name 'read-flink-statement' for the tool.
    READ_FLINK_STATEMENT = "read-flink-statement",
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations, the description carries full burden but only states the basic action. It fails to disclose critical behavioral traits such as whether this is a read-only operation, potential side effects, authentication needs, rate limits, or error handling. The timeout parameter description hints at pagination and retries, but this is in the schema, not the tool description itself.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, clear sentence with no wasted words, making it highly concise and front-loaded. It efficiently communicates the core action without unnecessary elaboration.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity (5 parameters, no annotations, no output schema), the description is incomplete. It lacks information on return values, error conditions, or operational context (e.g., how it relates to Flink statements). Without annotations or output schema, the description should provide more behavioral and result details to be fully helpful.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 100%, providing detailed parameter documentation. The tool description adds no additional meaning beyond the schema, such as explaining parameter interactions or usage examples. With high schema coverage, the baseline score of 3 is appropriate as the description doesn't compensate but also doesn't detract.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose3/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description states the action ('Make a request to read') and resource ('a statement and its results'), which clarifies the tool's purpose. However, it lacks specificity about what type of statement (e.g., Flink SQL statement) and doesn't differentiate from siblings like 'list-flink-statements' or 'read-connector', making it vague in context.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No explicit guidance is provided on when to use this tool versus alternatives. The description does not mention prerequisites (e.g., after creating a statement with 'create-flink-statement') or exclusions, leaving the agent without context for tool selection among siblings.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server