create_kafka_stream
Create a streaming data pipeline from Apache Kafka topics into Timeplus for real-time analytics and processing.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| topic | Yes |
Implementation Reference
- mcp_timeplus/mcp_server.py:205-220 (handler)The main handler function for the 'create_kafka_stream' tool. It is decorated with @mcp.tool(), which registers it as an MCP tool. The function creates an external Kafka stream in Timeplus and a materialized view from it using configuration from environment variables.@mcp.tool() def create_kafka_stream(topic: str): logger.info(f"Creating Kafka externalstream for topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) ext_stream=f"ext_stream_{topic}" sql=f"""CREATE EXTERNAL STREAM {ext_stream} (raw string) SETTINGS type='kafka',brokers='{conf['bootstrap.servers']}',topic='{topic}',security_protocol='{conf['security.protocol']}',sasl_mechanism='{conf['sasl.mechanism']}',username='{conf['sasl.username']}',password='{conf['sasl.password']}',skip_ssl_cert_check=true """ run_sql(sql) logger.info("External Stream created") sql=f"CREATE MATERIALIZED VIEW {topic} AS SELECT raw from {ext_stream}" run_sql(sql) logger.info("MATERIALIZED VIEW created") return f"Materialized the Kafka data as {topic}"