Skip to main content
Glama
stereosky

Lenses MCP Server

by stereosky

create_topic_with_schema

Create Kafka topics with configurable formats and schemas for structured data streaming. Define partitions, replication, and data formats like AVRO or JSON.

Instructions

Creates a new Kafka topic with optional format and schema configuration.

Args: environment: The environment name. name: Topic name. partitions: Number of partitions (default: 1). replication: Replication factor (default: 1). configs: Topic configurations. key_format: Key format (AVRO, JSON, CSV, XML, INT, LONG, STRING, BYTES, etc.). key_schema: Key schema (required for AVRO, JSON, CSV, XML). value_format: Value format. value_schema: Value schema.

Returns: Creation result.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
environmentYes
nameYes
partitionsNo
replicationNo
configsNo
key_formatNo
key_schemaNo
value_formatNo
value_schemaNo

Implementation Reference

  • The handler function decorated with @mcp.tool(), implementing the logic to create a Kafka topic with schema support via HTTP API request to Lenses.
    @mcp.tool()
    async def create_topic_with_schema(
        environment: str,
        name: str,
        partitions: int = 1,
        replication: int = 1,
        configs: Optional[Dict[str, str]] = None,
        key_format: Optional[str] = None,
        key_schema: Optional[str] = None,
        value_format: Optional[str] = None,
        value_schema: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Creates a new Kafka topic with optional format and schema configuration.
        
        Args:
            environment: The environment name.
            name: Topic name.
            partitions: Number of partitions (default: 1).
            replication: Replication factor (default: 1).
            configs: Topic configurations.
            key_format: Key format (AVRO, JSON, CSV, XML, INT, LONG, STRING, BYTES, etc.).
            key_schema: Key schema (required for AVRO, JSON, CSV, XML).
            value_format: Value format.
            value_schema: Value schema.
        
        Returns:
            Creation result.
        """
        payload = {
            "name": name,
            "partitions": partitions,
            "replication": replication
        }
        
        if configs:
            payload["configs"] = configs
        else:
            payload["configs"] = {}
        
        if key_format or value_format:
            format_config = {}
            if key_format:
                format_config["key"] = {"format": key_format}
                if key_schema:
                    format_config["key"]["schema"] = key_schema
            if value_format:
                format_config["value"] = {"format": value_format}
                if value_schema:
                    format_config["value"]["schema"] = value_schema
            payload["format"] = format_config
        
        endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/kafka/topic"
        try:
            return await api_client._make_request("POST", endpoint, payload)
        except Exception as e:
            raise ToolError(f"Topic creation failed: {e}")
  • Invocation of register_topics(mcp) which defines and registers the create_topic_with_schema tool among others.
    register_topics(mcp)
  • Function that contains the definition of all topic tools including create_topic_with_schema, which are registered when this function is called.
    def register_topics(mcp: FastMCP):

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stereosky/lenses-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server