Skip to main content
Glama
confluentinc

mcp-confluent

Official
by confluentinc

list-flink-statements

Retrieve and manage a sorted, filtered, and paginated list of Flink SQL statements using REST API, enabling precise control over statement queries and data.

Instructions

Retrieve a sorted, filtered, paginated list of all statements.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
baseUrlNoThe base URL of the Flink REST API.
computePoolIdNoFilter the results by exact match for compute_pool.
environmentIdNoThe unique identifier for the environment.
labelSelectorNoA comma-separated label selector to filter the statements.
organizationIdNoThe unique identifier for the organization.
pageSizeNoA pagination size for collection requests.
pageTokenNoAn opaque pagination token for collection requests.

Implementation Reference

  • The ListFlinkStatementsHandler class, extending BaseToolHandler, contains the core execution logic in its `handle` method, which parses arguments, makes API calls to list Flink statements, and returns the result.
    export class ListFlinkStatementsHandler extends BaseToolHandler {
      async handle(
        clientManager: ClientManager,
        toolArguments: Record<string, unknown> | undefined,
      ): Promise<CallToolResult> {
        const {
          pageSize,
          computePoolId,
          environmentId,
          labelSelector,
          organizationId,
          pageToken,
          baseUrl,
        } = listFlinkStatementsArguments.parse(toolArguments);
        const organization_id = getEnsuredParam(
          "FLINK_ORG_ID",
          "Organization ID is required",
          organizationId,
        );
        const environment_id = getEnsuredParam(
          "FLINK_ENV_ID",
          "Environment ID is required",
          environmentId,
        );
    
        if (baseUrl !== undefined && baseUrl !== "") {
          clientManager.setConfluentCloudFlinkEndpoint(baseUrl);
        }
    
        const pathBasedClient = wrapAsPathBasedClient(
          clientManager.getConfluentCloudFlinkRestClient(),
        );
        const { data: response, error } = await pathBasedClient[
          "/sql/v1/organizations/{organization_id}/environments/{environment_id}/statements"
        ].GET({
          params: {
            path: {
              organization_id: organization_id,
              environment_id: environment_id,
            },
            query: {
              "spec.compute_pool_id": computePoolId,
              page_size: pageSize,
              page_token: pageToken,
              label_selector: labelSelector,
            },
          },
        });
        if (error) {
          return this.createResponse(
            `Failed to list Flink SQL statements: ${JSON.stringify(error)}`,
            true,
          );
        }
        return this.createResponse(`${JSON.stringify(response)}`);
      }
      getToolConfig(): ToolConfig {
        return {
          name: ToolName.LIST_FLINK_STATEMENTS,
          description:
            "Retrieve a sorted, filtered, paginated list of all statements.",
          inputSchema: listFlinkStatementsArguments.shape,
        };
      }
    
      getRequiredEnvVars(): EnvVar[] {
        return ["FLINK_API_KEY", "FLINK_API_SECRET"];
      }
    
      isConfluentCloudOnly(): boolean {
        return true;
      }
    }
  • Zod schema defining the input parameters for the list-flink-statements tool, including baseUrl, organizationId, environmentId, computePoolId, pageSize, pageToken, and labelSelector.
    const listFlinkStatementsArguments = z.object({
      baseUrl: z
        .string()
        .describe("The base URL of the Flink REST API.")
        .url()
        .default(() => env.FLINK_REST_ENDPOINT ?? "")
        .optional(),
      organizationId: z
        .string()
        .optional()
        .describe("The unique identifier for the organization."),
      environmentId: z
        .string()
        .optional()
        .describe("The unique identifier for the environment."),
      computePoolId: z
        .string()
        .optional()
        .default(() => env.FLINK_COMPUTE_POOL_ID ?? "")
        .describe("Filter the results by exact match for compute_pool."),
      pageSize: z
        .number()
        .int()
        .nonnegative()
        .max(100)
        .default(10)
        .describe("A pagination size for collection requests."),
      pageToken: z
        .string()
        .max(255)
        .optional()
        .describe("An opaque pagination token for collection requests."),
      labelSelector: z
        .string()
        .optional()
        .describe("A comma-separated label selector to filter the statements."),
    });
  • Registration of the ListFlinkStatementsHandler in the ToolFactory's handlers map using the tool name constant.
    [ToolName.LIST_FLINK_STATEMENTS, new ListFlinkStatementsHandler()],
  • Import of the ListFlinkStatementsHandler in the ToolFactory.
    import { ListFlinkStatementsHandler } from "@src/confluent/tools/handlers/flink/list-flink-statements-handler.js";
  • Definition of the tool name constant LIST_FLINK_STATEMENTS in the ToolName enum.
    LIST_FLINK_STATEMENTS = "list-flink-statements",

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server