Skip to main content
Glama

mcp-confluent

Official
by confluentinc

list-flink-statements

Retrieve and manage a sorted, filtered, and paginated list of Flink SQL statements using REST API, enabling precise control over statement queries and data.

Instructions

Retrieve a sorted, filtered, paginated list of all statements.

Input Schema

NameRequiredDescriptionDefault
baseUrlNoThe base URL of the Flink REST API.
computePoolIdNoFilter the results by exact match for compute_pool.
environmentIdNoThe unique identifier for the environment.
labelSelectorNoA comma-separated label selector to filter the statements.
organizationIdNoThe unique identifier for the organization.
pageSizeNoA pagination size for collection requests.
pageTokenNoAn opaque pagination token for collection requests.

Input Schema (JSON Schema)

{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "properties": { "baseUrl": { "default": "", "description": "The base URL of the Flink REST API.", "format": "uri", "type": "string" }, "computePoolId": { "default": "", "description": "Filter the results by exact match for compute_pool.", "type": "string" }, "environmentId": { "description": "The unique identifier for the environment.", "type": "string" }, "labelSelector": { "description": "A comma-separated label selector to filter the statements.", "type": "string" }, "organizationId": { "description": "The unique identifier for the organization.", "type": "string" }, "pageSize": { "default": 10, "description": "A pagination size for collection requests.", "maximum": 100, "minimum": 0, "type": "integer" }, "pageToken": { "description": "An opaque pagination token for collection requests.", "maxLength": 255, "type": "string" } }, "type": "object" }

Implementation Reference

  • The ListFlinkStatementsHandler class, extending BaseToolHandler, contains the core execution logic in its `handle` method, which parses arguments, makes API calls to list Flink statements, and returns the result.
    export class ListFlinkStatementsHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown> | undefined, ): Promise<CallToolResult> { const { pageSize, computePoolId, environmentId, labelSelector, organizationId, pageToken, baseUrl, } = listFlinkStatementsArguments.parse(toolArguments); const organization_id = getEnsuredParam( "FLINK_ORG_ID", "Organization ID is required", organizationId, ); const environment_id = getEnsuredParam( "FLINK_ENV_ID", "Environment ID is required", environmentId, ); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudFlinkEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudFlinkRestClient(), ); const { data: response, error } = await pathBasedClient[ "/sql/v1/organizations/{organization_id}/environments/{environment_id}/statements" ].GET({ params: { path: { organization_id: organization_id, environment_id: environment_id, }, query: { "spec.compute_pool_id": computePoolId, page_size: pageSize, page_token: pageToken, label_selector: labelSelector, }, }, }); if (error) { return this.createResponse( `Failed to list Flink SQL statements: ${JSON.stringify(error)}`, true, ); } return this.createResponse(`${JSON.stringify(response)}`); } getToolConfig(): ToolConfig { return { name: ToolName.LIST_FLINK_STATEMENTS, description: "Retrieve a sorted, filtered, paginated list of all statements.", inputSchema: listFlinkStatementsArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["FLINK_API_KEY", "FLINK_API_SECRET"]; } isConfluentCloudOnly(): boolean { return true; } }
  • Zod schema defining the input parameters for the list-flink-statements tool, including baseUrl, organizationId, environmentId, computePoolId, pageSize, pageToken, and labelSelector.
    const listFlinkStatementsArguments = z.object({ baseUrl: z .string() .describe("The base URL of the Flink REST API.") .url() .default(() => env.FLINK_REST_ENDPOINT ?? "") .optional(), organizationId: z .string() .optional() .describe("The unique identifier for the organization."), environmentId: z .string() .optional() .describe("The unique identifier for the environment."), computePoolId: z .string() .optional() .default(() => env.FLINK_COMPUTE_POOL_ID ?? "") .describe("Filter the results by exact match for compute_pool."), pageSize: z .number() .int() .nonnegative() .max(100) .default(10) .describe("A pagination size for collection requests."), pageToken: z .string() .max(255) .optional() .describe("An opaque pagination token for collection requests."), labelSelector: z .string() .optional() .describe("A comma-separated label selector to filter the statements."), });
  • Registration of the ListFlinkStatementsHandler in the ToolFactory's handlers map using the tool name constant.
    [ToolName.LIST_FLINK_STATEMENTS, new ListFlinkStatementsHandler()],
  • Import of the ListFlinkStatementsHandler in the ToolFactory.
    import { ListFlinkStatementsHandler } from "@src/confluent/tools/handlers/flink/list-flink-statements-handler.js";
  • Definition of the tool name constant LIST_FLINK_STATEMENTS in the ToolName enum.
    LIST_FLINK_STATEMENTS = "list-flink-statements",

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server