Skip to main content
Glama

explore_kafka_topic

Analyze and retrieve messages from a specified Apache Kafka topic using Timeplus integration. Supports custom message count for efficient data exploration and streaming analysis.

Input Schema

NameRequiredDescriptionDefault
message_countNo
topicYes

Input Schema (JSON Schema)

{ "properties": { "message_count": { "default": 1, "title": "Message Count", "type": "integer" }, "topic": { "title": "Topic", "type": "string" } }, "required": [ "topic" ], "title": "explore_kafka_topicArguments", "type": "object" }

Implementation Reference

  • The main handler function decorated with @mcp.tool() that consumes a specified number of messages from the given Kafka topic using confluent_kafka Consumer and returns them as a list of JSON-parsed dictionaries.
    @mcp.tool() def explore_kafka_topic(topic: str, message_count: int = 1): logger.info(f"Consuming topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) conf['group.id'] = f"mcp-{time.time()}" client = Consumer(conf) client.subscribe([topic]) messages = [] for i in range(message_count): logger.info(f"Consuming message {i+1}") message = client.poll() if message is None: logger.info("No message received") continue if message.error(): logger.error(f"Error consuming message: {message.error()}") continue else: logger.info(f"Received message {i+1}") messages.append(json.loads(message.value())) client.close() return messages

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timeplus-io/mcp-timeplus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server