explore_kafka_topic
Analyze and retrieve messages from a specified Apache Kafka topic using Timeplus integration. Supports custom message count for efficient data exploration and streaming analysis.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| message_count | No | ||
| topic | Yes |
Input Schema (JSON Schema)
{
"properties": {
"message_count": {
"default": 1,
"title": "Message Count",
"type": "integer"
},
"topic": {
"title": "Topic",
"type": "string"
}
},
"required": [
"topic"
],
"title": "explore_kafka_topicArguments",
"type": "object"
}
Implementation Reference
- mcp_timeplus/mcp_server.py:182-203 (handler)The main handler function decorated with @mcp.tool() that consumes a specified number of messages from the given Kafka topic using confluent_kafka Consumer and returns them as a list of JSON-parsed dictionaries.@mcp.tool() def explore_kafka_topic(topic: str, message_count: int = 1): logger.info(f"Consuming topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) conf['group.id'] = f"mcp-{time.time()}" client = Consumer(conf) client.subscribe([topic]) messages = [] for i in range(message_count): logger.info(f"Consuming message {i+1}") message = client.poll() if message is None: logger.info("No message received") continue if message.error(): logger.error(f"Error consuming message: {message.error()}") continue else: logger.info(f"Received message {i+1}") messages.append(json.loads(message.value())) client.close() return messages