create_topic
Create a new Kafka topic with configurable partitions, replication, and settings for data streaming in Apache Kafka environments.
Instructions
Creates a new Kafka topic with optional configuration.
Args: environment: The environment name. topic_name: Topic name. partitions: Number of partitions (default: 1). replication: Replication factor (default: 1). configs: Topic configurations.
Returns: Creation result.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment | Yes | ||
| topic_name | Yes | ||
| partitions | No | ||
| replication | No | ||
| configs | No |
Implementation Reference
- src/lenses_mcp/tools/topics.py:60-97 (handler)The @mcp.tool()-decorated handler function that implements the logic for creating a new Kafka topic by making a POST request to the API endpoint with the specified parameters.@mcp.tool() async def create_topic( environment: str, topic_name: str, partitions: int = 1, replication: int = 1, configs: Optional[Dict[str, str]] = None ) -> str: """ Creates a new Kafka topic with optional configuration. Args: environment: The environment name. topic_name: Topic name. partitions: Number of partitions (default: 1). replication: Replication factor (default: 1). configs: Topic configurations. Returns: Creation result. """ payload = { "topicName": topic_name, "partitions": partitions, "replication": replication } if configs: payload["configs"] = configs else: payload["configs"] = {} endpoint = f"/api/v1/environments/{environment}/proxy/api/topics" try: return await api_client._make_request("POST", endpoint, payload) except Exception as e: raise ToolError(f"Topic creation failed: {e}")
- src/lenses_mcp/server.py:33-33 (registration)The call to register_topics(mcp) which defines and registers the create_topic tool along with other topic-related tools.register_topics(mcp)