create-flink-statement
Submit Flink SQL statements to process data streams in Confluent Cloud. Define statements with unique names, catalog, and database details for efficient execution within specified environments and compute pools.
Instructions
Make a request to create a statement.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| baseUrl | No | The base URL of the Flink REST API. | |
| catalogName | Yes | The catalog name to be used for the statement. Typically the confluent environment name. | |
| computePoolId | No | The id associated with the compute pool in context. | |
| databaseName | Yes | The database name to be used for the statement. Typically the Kafka cluster name. | |
| environmentId | No | The unique identifier for the environment. | |
| organizationId | No | The unique identifier for the organization. | |
| statement | Yes | The raw Flink SQL text statement. Create table statements may not be necessary as topics in confluent cloud will be detected as created schemas. Make sure to show and describe tables before creating new ones. | |
| statementName | Yes | The user provided name of the resource, unique within this environment. |
Input Schema (JSON Schema)
{
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"properties": {
"baseUrl": {
"default": "",
"description": "The base URL of the Flink REST API.",
"format": "uri",
"type": "string"
},
"catalogName": {
"default": "",
"description": "The catalog name to be used for the statement. Typically the confluent environment name.",
"minLength": 1,
"type": "string"
},
"computePoolId": {
"description": "The id associated with the compute pool in context.",
"type": "string"
},
"databaseName": {
"default": "",
"description": "The database name to be used for the statement. Typically the Kafka cluster name.",
"minLength": 1,
"type": "string"
},
"environmentId": {
"description": "The unique identifier for the environment.",
"type": "string"
},
"organizationId": {
"description": "The unique identifier for the organization.",
"type": "string"
},
"statement": {
"description": "The raw Flink SQL text statement. Create table statements may not be necessary as topics in confluent cloud will be detected as created schemas. Make sure to show and describe tables before creating new ones.",
"maxLength": 131072,
"minLength": 1,
"type": "string"
},
"statementName": {
"description": "The user provided name of the resource, unique within this environment.",
"maxLength": 100,
"minLength": 1,
"pattern": "[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
"type": "string"
}
},
"required": [
"statement",
"statementName",
"catalogName",
"databaseName"
],
"type": "object"
}
Implementation Reference
- CreateFlinkStatementHandler class: core implementation of the 'create-flink-statement' tool, including the handle() method that parses inputs, ensures parameters, and POSTs to Flink SQL API to create a statement.export class CreateFlinkStatementHandler extends BaseToolHandler { async handle( clientManager: ClientManager, toolArguments: Record<string, unknown> | undefined, ): Promise<CallToolResult> { const { catalogName, databaseName, statement, statementName, computePoolId, environmentId, organizationId, baseUrl, } = createFlinkStatementArguments.parse(toolArguments); const organization_id = getEnsuredParam( "FLINK_ORG_ID", "Organization ID is required", organizationId, ); const environment_id = getEnsuredParam( "FLINK_ENV_ID", "Environment ID is required", environmentId, ); const compute_pool_id = getEnsuredParam( "FLINK_COMPUTE_POOL_ID", "Compute Pool ID is required", computePoolId, ); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudFlinkEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudFlinkRestClient(), ); const { data: response, error } = await pathBasedClient[ "/sql/v1/organizations/{organization_id}/environments/{environment_id}/statements" ].POST({ params: { path: { environment_id: environment_id, organization_id: organization_id, }, }, body: { name: statementName, organization_id: organization_id, environment_id: environment_id, spec: { compute_pool_id: compute_pool_id, statement: statement, properties: { // only include the catalog and database properties if they are defined ...(catalogName && { "sql.current-catalog": catalogName }), ...(databaseName && { "sql.current-database": databaseName, }), }, }, }, }); if (error) { return this.createResponse( `Failed to create Flink SQL statements: ${JSON.stringify(error)}`, true, ); } return this.createResponse(`${JSON.stringify(response)}`); } getToolConfig(): ToolConfig { return { name: ToolName.CREATE_FLINK_STATEMENT, description: "Make a request to create a statement.", inputSchema: createFlinkStatementArguments.shape, }; } getRequiredEnvVars(): EnvVar[] { return ["FLINK_API_KEY", "FLINK_API_SECRET"]; } isConfluentCloudOnly(): boolean { return true; } }
- Zod input schema validating parameters for create-flink-statement tool: baseUrl, org/env/compute IDs, SQL statement, name, catalog, database.const createFlinkStatementArguments = z.object({ baseUrl: z .string() .describe("The base URL of the Flink REST API.") .url() .default(() => env.FLINK_REST_ENDPOINT ?? "") .optional(), organizationId: z .string() .trim() .optional() .describe("The unique identifier for the organization."), environmentId: z .string() .trim() .optional() .describe("The unique identifier for the environment."), computePoolId: z .string() .trim() .optional() .describe("The id associated with the compute pool in context."), statement: z .string() .nonempty() .max(131072) .describe( "The raw Flink SQL text statement. Create table statements may not be necessary as topics in confluent cloud will be detected as created schemas. Make sure to show and describe tables before creating new ones.", ), statementName: z .string() .regex( new RegExp( "[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*", ), ) .nonempty() .max(100) .describe( "The user provided name of the resource, unique within this environment.", ), catalogName: z .string() .trim() .nonempty() .default(() => env.FLINK_ENV_NAME ?? "") .describe( "The catalog name to be used for the statement. Typically the confluent environment name.", ), databaseName: z .string() .trim() .nonempty() .default(() => env.FLINK_DATABASE_NAME ?? "") .describe( "The database name to be used for the statement. Typically the Kafka cluster name.", ), });
- src/confluent/tools/tool-factory.ts:48-48 (registration)Registration of CreateFlinkStatementHandler in ToolFactory static handlers Map using the tool name constant.[ToolName.CREATE_FLINK_STATEMENT, new CreateFlinkStatementHandler()],
- src/confluent/tools/tool-name.ts:8-8 (registration)ToolName enum constant defining the string name 'create-flink-statement' for the tool.CREATE_FLINK_STATEMENT = "create-flink-statement",