Skip to main content
Glama

mcp-confluent

Official
by confluentinc

create-connector

Deploy Kafka connectors by specifying configuration details such as environment, cluster, and connector class. Simplifies integration with Confluent Kafka and Confluent Cloud REST APIs.

Instructions

Create a new connector. Returns the new connector information if successful.

Input Schema

NameRequiredDescriptionDefault
baseUrlNoThe base URL of the Kafka Connect REST API.
clusterIdNoThe unique identifier for the Kafka cluster.
connectorConfigYes
connectorNameYesThe name of the connector to create.
environmentIdNoThe unique identifier for the environment this resource belongs to.

Input Schema (JSON Schema)

{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "properties": { "baseUrl": { "default": "", "description": "The base URL of the Kafka Connect REST API.", "format": "uri", "type": "string" }, "clusterId": { "description": "The unique identifier for the Kafka cluster.", "type": "string" }, "connectorConfig": { "additionalProperties": { "type": "string" }, "properties": { "confluent.connector.type": { "default": "MANAGED", "description": "Required for Custom Connector. The connector type", "type": "string" }, "confluent.custom.connection.endpoints": { "description": "Optional for Custom Connector. Egress endpoint(s) for the connector", "type": "string" }, "confluent.custom.plugin.id": { "description": "Required for Custom Connector. The custom plugin id of custom connector", "type": "string" }, "confluent.custom.schema.registry.auto": { "default": "FALSE", "description": "Optional for Custom Connector. Automatically add required schema registry properties", "type": "string" }, "connector.class": { "description": "Required for Managed Connector, Ignored for Custom Connector. The connector class name, e.g., BigQuerySink, GcsSink, etc.", "type": "string" } }, "required": [ "connector.class" ], "type": "object" }, "connectorName": { "description": "The name of the connector to create.", "minLength": 1, "type": "string" }, "environmentId": { "description": "The unique identifier for the environment this resource belongs to.", "type": "string" } }, "required": [ "connectorName", "connectorConfig" ], "type": "object" }

Implementation Reference

  • The `handle` method in `CreateConnectorHandler` class that parses arguments, validates environment and cluster IDs, makes a POST request to the Confluent Cloud REST API to create the connector, and returns the result or error.
    async handle( clientManager: ClientManager, toolArguments: Record<string, unknown> | undefined, ): Promise<CallToolResult> { const { clusterId, environmentId, connectorName, connectorConfig, baseUrl, } = createConnectorArguments.parse(toolArguments); const environment_id = getEnsuredParam( "KAFKA_ENV_ID", "Environment ID is required", environmentId, ); const kafka_cluster_id = getEnsuredParam( "KAFKA_CLUSTER_ID", "Kafka Cluster ID is required", clusterId, ); if (baseUrl !== undefined && baseUrl !== "") { clientManager.setConfluentCloudRestEndpoint(baseUrl); } const pathBasedClient = wrapAsPathBasedClient( clientManager.getConfluentCloudRestClient(), ); const { data: response, error } = await pathBasedClient[ "/connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors" ].POST({ params: { path: { environment_id: environment_id, kafka_cluster_id: kafka_cluster_id, }, }, body: { name: connectorName, config: { name: connectorName, "kafka.api.key": getEnsuredParam( "KAFKA_API_KEY", "Kafka API Key is required to create the connector. Check if env vars are properly set", ), "kafka.api.secret": getEnsuredParam( "KAFKA_API_SECRET", "Kafka API Secret is required to create the connector. Check if env vars are properly set", ), ...connectorConfig, }, }, }); if (error) { return this.createResponse( `Failed to create connector ${connectorName}: ${JSON.stringify(error)}`, true, ); } return this.createResponse( `${connectorName} created: ${JSON.stringify(response)}`, ); }
  • Zod schema defining the input parameters for the create-connector tool, including baseUrl, environmentId, clusterId, connectorName, and connectorConfig with specific fields for managed and custom connectors.
    const createConnectorArguments = z.object({ baseUrl: z .string() .trim() .describe("The base URL of the Kafka Connect REST API.") .url() .default(() => env.CONFLUENT_CLOUD_REST_ENDPOINT ?? "") .optional(), environmentId: z .string() .optional() .describe( "The unique identifier for the environment this resource belongs to.", ), clusterId: z .string() .optional() .describe("The unique identifier for the Kafka cluster."), connectorName: z .string() .nonempty() .describe("The name of the connector to create."), connectorConfig: z .object({ // Required fields "connector.class": z .string() .describe( "Required for Managed Connector, Ignored for Custom Connector. The connector class name, e.g., BigQuerySink, GcsSink, etc.", ), // Optional fields "confluent.connector.type": z .string() .default("MANAGED") .describe("Required for Custom Connector. The connector type") .optional(), "confluent.custom.plugin.id": z .string() .describe( "Required for Custom Connector. The custom plugin id of custom connector", ) .optional(), "confluent.custom.connection.endpoints": z .string() .describe( "Optional for Custom Connector. Egress endpoint(s) for the connector", ) .optional(), "confluent.custom.schema.registry.auto": z .string() .default("FALSE") .describe( "Optional for Custom Connector. Automatically add required schema registry properties", ) .optional(), }) // Allow additional string properties .catchall(z.string()), });
  • Registration of the CreateConnectorHandler instance in the ToolFactory's static handlers Map under the CREATE_CONNECTOR key.
    [ToolName.CREATE_CONNECTOR, new CreateConnectorHandler()],
  • Enum value defining the string name 'create-connector' for the tool.
    CREATE_CONNECTOR = "create-connector",

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/confluentinc/mcp-confluent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server