list-flink-statements
Retrieve and manage a sorted, filtered, and paginated list of Flink SQL statements using REST API, enabling precise control over statement queries and data.
Instructions
Retrieve a sorted, filtered, paginated list of all statements.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| baseUrl | No | The base URL of the Flink REST API. | |
| computePoolId | No | Filter the results by exact match for compute_pool. | |
| environmentId | No | The unique identifier for the environment. | |
| labelSelector | No | A comma-separated label selector to filter the statements. | |
| organizationId | No | The unique identifier for the organization. | |
| pageSize | No | A pagination size for collection requests. | |
| pageToken | No | An opaque pagination token for collection requests. |
Input Schema (JSON Schema)
{
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"properties": {
"baseUrl": {
"default": "",
"description": "The base URL of the Flink REST API.",
"format": "uri",
"type": "string"
},
"computePoolId": {
"default": "",
"description": "Filter the results by exact match for compute_pool.",
"type": "string"
},
"environmentId": {
"description": "The unique identifier for the environment.",
"type": "string"
},
"labelSelector": {
"description": "A comma-separated label selector to filter the statements.",
"type": "string"
},
"organizationId": {
"description": "The unique identifier for the organization.",
"type": "string"
},
"pageSize": {
"default": 10,
"description": "A pagination size for collection requests.",
"maximum": 100,
"minimum": 0,
"type": "integer"
},
"pageToken": {
"description": "An opaque pagination token for collection requests.",
"maxLength": 255,
"type": "string"
}
},
"type": "object"
}
Implementation Reference
- The ListFlinkStatementsHandler class, extending BaseToolHandler, contains the core execution logic in its `handle` method, which parses arguments, makes API calls to list Flink statements, and returns the result.export class ListFlinkStatementsHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown> | undefined, ): Promise<CallToolResult> { const { pageSize, computePoolId, environmentId, labelSelector, organizationId, pageToken, baseUrl, } = listFlinkStatementsArguments.parse(toolArguments); const organization_id = getEnsuredParam( "FLINK_ORG_ID", "Organization ID is required", organizationId, ); const environment_id = getEnsuredParam( "FLINK_ENV_ID", "Environment ID is required", environmentId, ); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudFlinkEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudFlinkRestClient(), ); const { data: response, error } = await pathBasedClient[ "/sql/v1/organizations/{organization_id}/environments/{environment_id}/statements" ].GET({ params: { path: { organization_id: organization_id, environment_id: environment_id, }, query: { "spec.compute_pool_id": computePoolId, page_size: pageSize, page_token: pageToken, label_selector: labelSelector, }, }, }); if (error) { return this.createResponse( `Failed to list Flink SQL statements: ${JSON.stringify(error)}`, true, ); } return this.createResponse(`${JSON.stringify(response)}`); } getToolConfig(): ToolConfig { return { name: ToolName.LIST_FLINK_STATEMENTS, description: "Retrieve a sorted, filtered, paginated list of all statements.", inputSchema: listFlinkStatementsArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["FLINK_API_KEY", "FLINK_API_SECRET"]; } isConfluentCloudOnly(): boolean { return true; } }
- Zod schema defining the input parameters for the list-flink-statements tool, including baseUrl, organizationId, environmentId, computePoolId, pageSize, pageToken, and labelSelector.const listFlinkStatementsArguments = z.object({ baseUrl: z .string() .describe("The base URL of the Flink REST API.") .url() .default(() => env.FLINK_REST_ENDPOINT ?? "") .optional(), organizationId: z .string() .optional() .describe("The unique identifier for the organization."), environmentId: z .string() .optional() .describe("The unique identifier for the environment."), computePoolId: z .string() .optional() .default(() => env.FLINK_COMPUTE_POOL_ID ?? "") .describe("Filter the results by exact match for compute_pool."), pageSize: z .number() .int() .nonnegative() .max(100) .default(10) .describe("A pagination size for collection requests."), pageToken: z .string() .max(255) .optional() .describe("An opaque pagination token for collection requests."), labelSelector: z .string() .optional() .describe("A comma-separated label selector to filter the statements."), });
- src/confluent/tools/tool-factory.ts:47-47 (registration)Registration of the ListFlinkStatementsHandler in the ToolFactory's handlers map using the tool name constant.[ToolName.LIST_FLINK_STATEMENTS, new ListFlinkStatementsHandler()],
- src/confluent/tools/tool-factory.ts:16-16 (registration)Import of the ListFlinkStatementsHandler in the ToolFactory.import { ListFlinkStatementsHandler } from "@src/confluent/tools/handlers/flink/list-flink-statements-handler.js";
- src/confluent/tools/tool-name.ts:7-7 (registration)Definition of the tool name constant LIST_FLINK_STATEMENTS in the ToolName enum.LIST_FLINK_STATEMENTS = "list-flink-statements",