Skip to main content
Glama
timeplus-io

mcp-timeplus

by timeplus-io

create_kafka_stream

Create a Kafka stream by specifying a topic name to enable real-time data ingestion into Timeplus.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
topicYes

Implementation Reference

  • The main handler for the 'create_kafka_stream' MCP tool. Decorated with @mcp.tool(), it takes a topic name, creates an external Kafka stream via SQL, then creates a materialized view over that stream, and returns a confirmation message.
    @mcp.tool()
    def create_kafka_stream(topic: str):
        logger.info(f"Creating Kafka externalstream for topic {topic}")
        conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG'])
        ext_stream=f"ext_stream_{topic}"
        sql=f"""CREATE EXTERNAL STREAM {ext_stream} (raw string)
        SETTINGS type='kafka',brokers='{conf['bootstrap.servers']}',topic='{topic}',security_protocol='{conf['security.protocol']}',sasl_mechanism='{conf['sasl.mechanism']}',username='{conf['sasl.username']}',password='{conf['sasl.password']}',skip_ssl_cert_check=true
        """
        run_sql(sql)
        logger.info("External Stream created")
    
        sql=f"CREATE MATERIALIZED VIEW {topic} AS SELECT raw from {ext_stream}"
        run_sql(sql)
        logger.info("MATERIALIZED VIEW created")
    
        return f"Materialized the Kafka data as {topic}"
  • The MCP server instance (FastMCP) used to register the tool via @mcp.tool() decorator on the handler function.
    mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps)
  • The function is exported in the package's __all__ list for public access.
    __all__ = [
        "list_databases",
        "list_tables",
        "run_sql",
        "create_timeplus_client",
        "list_kafka_topics",
        "explore_kafka_topic",
        "create_kafka_stream",
        "generate_sql",
        "connect_to_apache_iceberg",
    ]
  • The run_sql helper function called by create_kafka_stream to execute SQL queries against the Timeplus database.
    def run_sql(query: str):
        """Run a query in a Timeplus database"""
        logger.info(f"Executing query: {query}")
        try:
            future = QUERY_EXECUTOR.submit(execute_query, query)
            try:
                result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS)
                # Check if we received an error structure from execute_query
                if isinstance(result, dict) and "error" in result:
                    logger.warning(f"Query failed: {result['error']}")
                    # MCP requires structured responses; string error messages can cause
                    # serialization issues leading to BrokenResourceError
                    return {"status": "error", "message": f"Query failed: {result['error']}"}
                return result
            except concurrent.futures.TimeoutError:
                logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}")
                future.cancel()
                # Return a properly structured response for timeout errors
                return {"status": "error", "message": f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds"}
        except Exception as e:
            logger.error(f"Unexpected error in run_select_query: {str(e)}")
            # Catch all other exceptions and return them in a structured format
            # to prevent MCP serialization failures
            return {"status": "error", "message": f"Unexpected error: {str(e)}"}
    
    
    @mcp.prompt()
    def generate_sql(requirements: str) -> str:
        return f"Please generate Timeplus SQL for the requirement:\n\n{requirements}\n\nMake sure following the guide {TEMPLATE}"
    
    @mcp.tool()
    def list_kafka_topics():
        logger.info("Listing all topics in the Kafka cluster")
        admin_client = AdminClient(json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']))
        topics = admin_client.list_topics(timeout=10).topics
Behavior1/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

Tool has no description.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness1/5

Is the description appropriately sized, front-loaded, and free of redundancy?

Tool has no description.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness1/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Tool has no description.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters1/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Tool has no description.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose1/5

Does the description clearly state what the tool does and how it differs from similar tools?

Tool has no description.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines1/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

Tool has no description.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timeplus-io/mcp-timeplus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server