Skip to main content
Glama

create_kafka_stream

Initiate real-time data streams from Apache Kafka topics directly into Timeplus for analysis and processing using the MCP server integration.

Input Schema

NameRequiredDescriptionDefault
topicYes

Input Schema (JSON Schema)

{ "properties": { "topic": { "title": "Topic", "type": "string" } }, "required": [ "topic" ], "title": "create_kafka_streamArguments", "type": "object" }

Implementation Reference

  • The core implementation of the 'create_kafka_stream' tool, decorated with @mcp.tool(). Creates a Timeplus external stream from the specified Kafka topic and a materialized view for it.
    def create_kafka_stream(topic: str): logger.info(f"Creating Kafka externalstream for topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) ext_stream=f"ext_stream_{topic}" sql=f"""CREATE EXTERNAL STREAM {ext_stream} (raw string) SETTINGS type='kafka',brokers='{conf['bootstrap.servers']}',topic='{topic}',security_protocol='{conf['security.protocol']}',sasl_mechanism='{conf['sasl.mechanism']}',username='{conf['sasl.username']}',password='{conf['sasl.password']}',skip_ssl_cert_check=true """ run_sql(sql) logger.info("External Stream created") sql=f"CREATE MATERIALIZED VIEW {topic} AS SELECT raw from {ext_stream}" run_sql(sql) logger.info("MATERIALIZED VIEW created") return f"Materialized the Kafka data as {topic}"
  • Package __init__.py re-exports the create_kafka_stream tool function along with others, making it available at the package level.
    from .mcp_server import ( create_timeplus_client, list_databases, list_tables, run_sql, list_kafka_topics, explore_kafka_topic, create_kafka_stream, generate_sql, connect_to_apache_iceberg, ) __all__ = [ "list_databases", "list_tables", "run_sql", "create_timeplus_client", "list_kafka_topics", "explore_kafka_topic", "create_kafka_stream", "generate_sql", "connect_to_apache_iceberg", ]

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timeplus-io/mcp-timeplus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server