create-connector
Deploy Kafka connectors by specifying configuration details such as environment, cluster, and connector class. Simplifies integration with Confluent Kafka and Confluent Cloud REST APIs.
Instructions
Create a new connector. Returns the new connector information if successful.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| baseUrl | No | The base URL of the Kafka Connect REST API. | |
| clusterId | No | The unique identifier for the Kafka cluster. | |
| connectorConfig | Yes | ||
| connectorName | Yes | The name of the connector to create. | |
| environmentId | No | The unique identifier for the environment this resource belongs to. |
Input Schema (JSON Schema)
{
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"properties": {
"baseUrl": {
"default": "",
"description": "The base URL of the Kafka Connect REST API.",
"format": "uri",
"type": "string"
},
"clusterId": {
"description": "The unique identifier for the Kafka cluster.",
"type": "string"
},
"connectorConfig": {
"additionalProperties": {
"type": "string"
},
"properties": {
"confluent.connector.type": {
"default": "MANAGED",
"description": "Required for Custom Connector. The connector type",
"type": "string"
},
"confluent.custom.connection.endpoints": {
"description": "Optional for Custom Connector. Egress endpoint(s) for the connector",
"type": "string"
},
"confluent.custom.plugin.id": {
"description": "Required for Custom Connector. The custom plugin id of custom connector",
"type": "string"
},
"confluent.custom.schema.registry.auto": {
"default": "FALSE",
"description": "Optional for Custom Connector. Automatically add required schema registry properties",
"type": "string"
},
"connector.class": {
"description": "Required for Managed Connector, Ignored for Custom Connector. The connector class name, e.g., BigQuerySink, GcsSink, etc.",
"type": "string"
}
},
"required": [
"connector.class"
],
"type": "object"
},
"connectorName": {
"description": "The name of the connector to create.",
"minLength": 1,
"type": "string"
},
"environmentId": {
"description": "The unique identifier for the environment this resource belongs to.",
"type": "string"
}
},
"required": [
"connectorName",
"connectorConfig"
],
"type": "object"
}
Implementation Reference
- The `handle` method in `CreateConnectorHandler` class that parses arguments, validates environment and cluster IDs, makes a POST request to the Confluent Cloud REST API to create the connector, and returns the result or error.async handle( clientManager: ClientManager, toolArguments: Record<string, unknown> | undefined, ): Promise<CallToolResult> { const { clusterId, environmentId, connectorName, connectorConfig, baseUrl, } = createConnectorArguments.parse(toolArguments); const environment_id = getEnsuredParam( "KAFKA_ENV_ID", "Environment ID is required", environmentId, ); const kafka_cluster_id = getEnsuredParam( "KAFKA_CLUSTER_ID", "Kafka Cluster ID is required", clusterId, ); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudRestEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudRestClient(), ); const { data: response, error } = await pathBasedClient[ "/connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors" ].POST({ params: { path: { environment_id: environment_id, kafka_cluster_id: kafka_cluster_id, }, }, body: { name: connectorName, config: { name: connectorName, "kafka.api.key": getEnsuredParam( "KAFKA_API_KEY", "Kafka API Key is required to create the connector. Check if env vars are properly set", ), "kafka.api.secret": getEnsuredParam( "KAFKA_API_SECRET", "Kafka API Secret is required to create the connector. Check if env vars are properly set", ), ...connectorConfig, }, }, }); if (error) { return this.createResponse( `Failed to create connector ${connectorName}: ${JSON.stringify(error)}`, true, ); } return this.createResponse( `${connectorName} created: ${JSON.stringify(response)}`, ); }
- Zod schema defining the input parameters for the create-connector tool, including baseUrl, environmentId, clusterId, connectorName, and connectorConfig with specific fields for managed and custom connectors.const createConnectorArguments = z.object({ baseUrl: z .string() .trim() .describe("The base URL of the Kafka Connect REST API.") .url() .default(() => env.CONFLUENT_CLOUD_REST_ENDPOINT ?? "") .optional(), environmentId: z .string() .optional() .describe( "The unique identifier for the environment this resource belongs to.", ), clusterId: z .string() .optional() .describe("The unique identifier for the Kafka cluster."), connectorName: z .string() .nonempty() .describe("The name of the connector to create."), connectorConfig: z .object({ // Required fields "connector.class": z .string() .describe( "Required for Managed Connector, Ignored for Custom Connector. The connector class name, e.g., BigQuerySink, GcsSink, etc.", ), // Optional fields "confluent.connector.type": z .string() .default("MANAGED") .describe("Required for Custom Connector. The connector type") .optional(), "confluent.custom.plugin.id": z .string() .describe( "Required for Custom Connector. The custom plugin id of custom connector", ) .optional(), "confluent.custom.connection.endpoints": z .string() .describe( "Optional for Custom Connector. Egress endpoint(s) for the connector", ) .optional(), "confluent.custom.schema.registry.auto": z .string() .default("FALSE") .describe( "Optional for Custom Connector. Automatically add required schema registry properties", ) .optional(), }) // Allow additional string properties .catchall(z.string()), });
- src/confluent/tools/tool-factory.ts:53-53 (registration)Registration of the CreateConnectorHandler instance in the ToolFactory's static handlers Map under the CREATE_CONNECTOR key.[ToolName.CREATE_CONNECTOR, new CreateConnectorHandler()],
- Enum value defining the string name 'create-connector' for the tool.CREATE_CONNECTOR = "create-connector",