Skip to main content
Glama
timeplus-io

mcp-timeplus

by timeplus-io

explore_kafka_topic

Analyze Kafka topic data in Timeplus to understand message structure and content. Specify a topic and message count to retrieve and examine streaming data patterns.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
topicYes
message_countNo

Implementation Reference

  • The @mcp.tool()-decorated handler function that implements the logic to consume and return a specified number of messages from a Kafka topic.
    @mcp.tool()
    def explore_kafka_topic(topic: str, message_count: int = 1):
        logger.info(f"Consuming topic {topic}")
        conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG'])
        conf['group.id'] = f"mcp-{time.time()}"
        client = Consumer(conf)
        client.subscribe([topic])
        messages = []
        for i in range(message_count):
            logger.info(f"Consuming message {i+1}")
            message = client.poll()
            if message is None:
                logger.info("No message received")
                continue
            if message.error():
                logger.error(f"Error consuming message: {message.error()}")
                continue
            else:
                logger.info(f"Received message {i+1}")
                messages.append(json.loads(message.value()))
        client.close()
        return messages
  • The tool is exported via __init__.py for package-level access and registration.
    __all__ = [
        "list_databases",
        "list_tables",
        "run_sql",
        "create_timeplus_client",
        "list_kafka_topics",
        "explore_kafka_topic",
        "create_kafka_stream",
        "generate_sql",
        "connect_to_apache_iceberg",
    ]
  • The FastMCP instance is created here, and tool decorators register tools on it.
    mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timeplus-io/mcp-timeplus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server