explore_kafka_topic
Retrieve messages from a specified Kafka topic to inspect streaming data. Optionally limit the number of messages returned.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| topic | Yes | ||
| message_count | No |
Implementation Reference
- mcp_timeplus/mcp_server.py:182-203 (handler)The main handler function for the explore_kafka_topic tool. It consumes messages from a Kafka topic using the confluent_kafka Consumer, returns parsed JSON messages.
@mcp.tool() def explore_kafka_topic(topic: str, message_count: int = 1): logger.info(f"Consuming topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) conf['group.id'] = f"mcp-{time.time()}" client = Consumer(conf) client.subscribe([topic]) messages = [] for i in range(message_count): logger.info(f"Consuming message {i+1}") message = client.poll() if message is None: logger.info("No message received") continue if message.error(): logger.error(f"Error consuming message: {message.error()}") continue else: logger.info(f"Received message {i+1}") messages.append(json.loads(message.value())) client.close() return messages - mcp_timeplus/mcp_server.py:182-183 (registration)The tool is registered as an MCP tool via the @mcp.tool() decorator on FastMCP instance in mcp_server.py
@mcp.tool() def explore_kafka_topic(topic: str, message_count: int = 1): - mcp_timeplus/mcp_server.py:17-17 (helper)Import of the confluent_kafka Consumer class used by explore_kafka_topic
from confluent_kafka import Consumer - mcp_timeplus/mcp_server.py:183-183 (schema)Input parameters: topic (str, required) and message_count (int, default 1) define the schema for this tool
def explore_kafka_topic(topic: str, message_count: int = 1):