Skip to main content
Glama

create_topic_with_schema

Create Kafka topics with configurable formats and schemas for structured data streaming. Define partitions, replication, and data formats like AVRO or JSON.

Instructions

Creates a new Kafka topic with optional format and schema configuration.

Args: environment: The environment name. name: Topic name. partitions: Number of partitions (default: 1). replication: Replication factor (default: 1). configs: Topic configurations. key_format: Key format (AVRO, JSON, CSV, XML, INT, LONG, STRING, BYTES, etc.). key_schema: Key schema (required for AVRO, JSON, CSV, XML). value_format: Value format. value_schema: Value schema.

Returns: Creation result.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
environmentYes
nameYes
partitionsNo
replicationNo
configsNo
key_formatNo
key_schemaNo
value_formatNo
value_schemaNo

Implementation Reference

  • The handler function decorated with @mcp.tool(), implementing the logic to create a Kafka topic with schema support via HTTP API request to Lenses.
    @mcp.tool() async def create_topic_with_schema( environment: str, name: str, partitions: int = 1, replication: int = 1, configs: Optional[Dict[str, str]] = None, key_format: Optional[str] = None, key_schema: Optional[str] = None, value_format: Optional[str] = None, value_schema: Optional[str] = None ) -> Dict[str, Any]: """ Creates a new Kafka topic with optional format and schema configuration. Args: environment: The environment name. name: Topic name. partitions: Number of partitions (default: 1). replication: Replication factor (default: 1). configs: Topic configurations. key_format: Key format (AVRO, JSON, CSV, XML, INT, LONG, STRING, BYTES, etc.). key_schema: Key schema (required for AVRO, JSON, CSV, XML). value_format: Value format. value_schema: Value schema. Returns: Creation result. """ payload = { "name": name, "partitions": partitions, "replication": replication } if configs: payload["configs"] = configs else: payload["configs"] = {} if key_format or value_format: format_config = {} if key_format: format_config["key"] = {"format": key_format} if key_schema: format_config["key"]["schema"] = key_schema if value_format: format_config["value"] = {"format": value_format} if value_schema: format_config["value"]["schema"] = value_schema payload["format"] = format_config endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/kafka/topic" try: return await api_client._make_request("POST", endpoint, payload) except Exception as e: raise ToolError(f"Topic creation failed: {e}")
  • Invocation of register_topics(mcp) which defines and registers the create_topic_with_schema tool among others.
    register_topics(mcp)
  • Function that contains the definition of all topic tools including create_topic_with_schema, which are registered when this function is called.
    def register_topics(mcp: FastMCP):

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stereosky/lenses-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server