create_topic_with_schema
Create Kafka topics with configurable formats and schemas for structured data streaming. Define partitions, replication, and data formats like AVRO or JSON.
Instructions
Creates a new Kafka topic with optional format and schema configuration.
Args: environment: The environment name. name: Topic name. partitions: Number of partitions (default: 1). replication: Replication factor (default: 1). configs: Topic configurations. key_format: Key format (AVRO, JSON, CSV, XML, INT, LONG, STRING, BYTES, etc.). key_schema: Key schema (required for AVRO, JSON, CSV, XML). value_format: Value format. value_schema: Value schema.
Returns: Creation result.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment | Yes | ||
| name | Yes | ||
| partitions | No | ||
| replication | No | ||
| configs | No | ||
| key_format | No | ||
| key_schema | No | ||
| value_format | No | ||
| value_schema | No |
Implementation Reference
- src/lenses_mcp/tools/topics.py:98-154 (handler)The handler function decorated with @mcp.tool(), implementing the logic to create a Kafka topic with schema support via HTTP API request to Lenses.@mcp.tool() async def create_topic_with_schema( environment: str, name: str, partitions: int = 1, replication: int = 1, configs: Optional[Dict[str, str]] = None, key_format: Optional[str] = None, key_schema: Optional[str] = None, value_format: Optional[str] = None, value_schema: Optional[str] = None ) -> Dict[str, Any]: """ Creates a new Kafka topic with optional format and schema configuration. Args: environment: The environment name. name: Topic name. partitions: Number of partitions (default: 1). replication: Replication factor (default: 1). configs: Topic configurations. key_format: Key format (AVRO, JSON, CSV, XML, INT, LONG, STRING, BYTES, etc.). key_schema: Key schema (required for AVRO, JSON, CSV, XML). value_format: Value format. value_schema: Value schema. Returns: Creation result. """ payload = { "name": name, "partitions": partitions, "replication": replication } if configs: payload["configs"] = configs else: payload["configs"] = {} if key_format or value_format: format_config = {} if key_format: format_config["key"] = {"format": key_format} if key_schema: format_config["key"]["schema"] = key_schema if value_format: format_config["value"] = {"format": value_format} if value_schema: format_config["value"]["schema"] = value_schema payload["format"] = format_config endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/kafka/topic" try: return await api_client._make_request("POST", endpoint, payload) except Exception as e: raise ToolError(f"Topic creation failed: {e}")
- src/lenses_mcp/server.py:33-33 (registration)Invocation of register_topics(mcp) which defines and registers the create_topic_with_schema tool among others.register_topics(mcp)
- src/lenses_mcp/tools/topics.py:10-10 (registration)Function that contains the definition of all topic tools including create_topic_with_schema, which are registered when this function is called.def register_topics(mcp: FastMCP):