create_kafka_stream
Create a Kafka stream by specifying a topic name to enable real-time data ingestion into Timeplus.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| topic | Yes |
Implementation Reference
- mcp_timeplus/mcp_server.py:205-220 (handler)The main handler for the 'create_kafka_stream' MCP tool. Decorated with @mcp.tool(), it takes a topic name, creates an external Kafka stream via SQL, then creates a materialized view over that stream, and returns a confirmation message.
@mcp.tool() def create_kafka_stream(topic: str): logger.info(f"Creating Kafka externalstream for topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) ext_stream=f"ext_stream_{topic}" sql=f"""CREATE EXTERNAL STREAM {ext_stream} (raw string) SETTINGS type='kafka',brokers='{conf['bootstrap.servers']}',topic='{topic}',security_protocol='{conf['security.protocol']}',sasl_mechanism='{conf['sasl.mechanism']}',username='{conf['sasl.username']}',password='{conf['sasl.password']}',skip_ssl_cert_check=true """ run_sql(sql) logger.info("External Stream created") sql=f"CREATE MATERIALIZED VIEW {topic} AS SELECT raw from {ext_stream}" run_sql(sql) logger.info("MATERIALIZED VIEW created") return f"Materialized the Kafka data as {topic}" - mcp_timeplus/mcp_server.py:39-39 (registration)The MCP server instance (FastMCP) used to register the tool via @mcp.tool() decorator on the handler function.
mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps) - mcp_timeplus/__init__.py:13-23 (registration)The function is exported in the package's __all__ list for public access.
__all__ = [ "list_databases", "list_tables", "run_sql", "create_timeplus_client", "list_kafka_topics", "explore_kafka_topic", "create_kafka_stream", "generate_sql", "connect_to_apache_iceberg", ] - mcp_timeplus/mcp_server.py:141-175 (helper)The run_sql helper function called by create_kafka_stream to execute SQL queries against the Timeplus database.
def run_sql(query: str): """Run a query in a Timeplus database""" logger.info(f"Executing query: {query}") try: future = QUERY_EXECUTOR.submit(execute_query, query) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_query if isinstance(result, dict) and "error" in result: logger.warning(f"Query failed: {result['error']}") # MCP requires structured responses; string error messages can cause # serialization issues leading to BrokenResourceError return {"status": "error", "message": f"Query failed: {result['error']}"} return result except concurrent.futures.TimeoutError: logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}") future.cancel() # Return a properly structured response for timeout errors return {"status": "error", "message": f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds"} except Exception as e: logger.error(f"Unexpected error in run_select_query: {str(e)}") # Catch all other exceptions and return them in a structured format # to prevent MCP serialization failures return {"status": "error", "message": f"Unexpected error: {str(e)}"} @mcp.prompt() def generate_sql(requirements: str) -> str: return f"Please generate Timeplus SQL for the requirement:\n\n{requirements}\n\nMake sure following the guide {TEMPLATE}" @mcp.tool() def list_kafka_topics(): logger.info("Listing all topics in the Kafka cluster") admin_client = AdminClient(json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG'])) topics = admin_client.list_topics(timeout=10).topics