explore_kafka_topic
Analyze and retrieve messages from a specified Apache Kafka topic using Timeplus integration. Supports custom message count for efficient data exploration and streaming analysis.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| message_count | No | ||
| topic | Yes |
Implementation Reference
- mcp_timeplus/mcp_server.py:182-203 (handler)The main handler function decorated with @mcp.tool() that consumes a specified number of messages from the given Kafka topic using confluent_kafka Consumer and returns them as a list of JSON-parsed dictionaries.@mcp.tool() def explore_kafka_topic(topic: str, message_count: int = 1): logger.info(f"Consuming topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) conf['group.id'] = f"mcp-{time.time()}" client = Consumer(conf) client.subscribe([topic]) messages = [] for i in range(message_count): logger.info(f"Consuming message {i+1}") message = client.poll() if message is None: logger.info("No message received") continue if message.error(): logger.error(f"Error consuming message: {message.error()}") continue else: logger.info(f"Received message {i+1}") messages.append(json.loads(message.value())) client.close() return messages