explore_kafka_topic
Analyze Kafka topic data in Timeplus to understand message structure and content. Specify a topic and message count to retrieve and examine streaming data patterns.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| topic | Yes | ||
| message_count | No |
Implementation Reference
- mcp_timeplus/mcp_server.py:182-203 (handler)The @mcp.tool()-decorated handler function that implements the logic to consume and return a specified number of messages from a Kafka topic.@mcp.tool() def explore_kafka_topic(topic: str, message_count: int = 1): logger.info(f"Consuming topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) conf['group.id'] = f"mcp-{time.time()}" client = Consumer(conf) client.subscribe([topic]) messages = [] for i in range(message_count): logger.info(f"Consuming message {i+1}") message = client.poll() if message is None: logger.info("No message received") continue if message.error(): logger.error(f"Error consuming message: {message.error()}") continue else: logger.info(f"Received message {i+1}") messages.append(json.loads(message.value())) client.close() return messages
- mcp_timeplus/__init__.py:13-23 (registration)The tool is exported via __init__.py for package-level access and registration.__all__ = [ "list_databases", "list_tables", "run_sql", "create_timeplus_client", "list_kafka_topics", "explore_kafka_topic", "create_kafka_stream", "generate_sql", "connect_to_apache_iceberg", ]
- mcp_timeplus/mcp_server.py:39-39 (registration)The FastMCP instance is created here, and tool decorators register tools on it.mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps)