create_kafka_stream
Initiate real-time data streams from Apache Kafka topics directly into Timeplus for analysis and processing using the MCP server integration.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| topic | Yes |
Input Schema (JSON Schema)
{
"properties": {
"topic": {
"title": "Topic",
"type": "string"
}
},
"required": [
"topic"
],
"title": "create_kafka_streamArguments",
"type": "object"
}
Implementation Reference
- mcp_timeplus/mcp_server.py:206-220 (handler)The core implementation of the 'create_kafka_stream' tool, decorated with @mcp.tool(). Creates a Timeplus external stream from the specified Kafka topic and a materialized view for it.def create_kafka_stream(topic: str): logger.info(f"Creating Kafka externalstream for topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) ext_stream=f"ext_stream_{topic}" sql=f"""CREATE EXTERNAL STREAM {ext_stream} (raw string) SETTINGS type='kafka',brokers='{conf['bootstrap.servers']}',topic='{topic}',security_protocol='{conf['security.protocol']}',sasl_mechanism='{conf['sasl.mechanism']}',username='{conf['sasl.username']}',password='{conf['sasl.password']}',skip_ssl_cert_check=true """ run_sql(sql) logger.info("External Stream created") sql=f"CREATE MATERIALIZED VIEW {topic} AS SELECT raw from {ext_stream}" run_sql(sql) logger.info("MATERIALIZED VIEW created") return f"Materialized the Kafka data as {topic}"
- mcp_timeplus/__init__.py:1-23 (registration)Package __init__.py re-exports the create_kafka_stream tool function along with others, making it available at the package level.from .mcp_server import ( create_timeplus_client, list_databases, list_tables, run_sql, list_kafka_topics, explore_kafka_topic, create_kafka_stream, generate_sql, connect_to_apache_iceberg, ) __all__ = [ "list_databases", "list_tables", "run_sql", "create_timeplus_client", "list_kafka_topics", "explore_kafka_topic", "create_kafka_stream", "generate_sql", "connect_to_apache_iceberg", ]