Skip to main content
Glama

explore_kafka_topic

Analyze Kafka topic data in Timeplus to understand message structure and content. Specify a topic and message count to retrieve and examine streaming data patterns.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
topicYes
message_countNo

Implementation Reference

  • The @mcp.tool()-decorated handler function that implements the logic to consume and return a specified number of messages from a Kafka topic.
    @mcp.tool() def explore_kafka_topic(topic: str, message_count: int = 1): logger.info(f"Consuming topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) conf['group.id'] = f"mcp-{time.time()}" client = Consumer(conf) client.subscribe([topic]) messages = [] for i in range(message_count): logger.info(f"Consuming message {i+1}") message = client.poll() if message is None: logger.info("No message received") continue if message.error(): logger.error(f"Error consuming message: {message.error()}") continue else: logger.info(f"Received message {i+1}") messages.append(json.loads(message.value())) client.close() return messages
  • The tool is exported via __init__.py for package-level access and registration.
    __all__ = [ "list_databases", "list_tables", "run_sql", "create_timeplus_client", "list_kafka_topics", "explore_kafka_topic", "create_kafka_stream", "generate_sql", "connect_to_apache_iceberg", ]
  • The FastMCP instance is created here, and tool decorators register tools on it.
    mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timeplus-io/mcp-timeplus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server