list_kafka_topics
Retrieve and manage Kafka topics directly from the mcp-timeplus server, enabling streamlined integration with Timeplus for efficient streaming data operations.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Input Schema (JSON Schema)
{
"properties": {},
"title": "list_kafka_topicsArguments",
"type": "object"
}
Implementation Reference
- mcp_timeplus/mcp_server.py:171-180 (handler)The core handler function for the 'list_kafka_topics' tool, decorated with @mcp.tool(). It lists all Kafka topics using the AdminClient from confluent_kafka, constructs a list of dictionaries with topic names and partition counts, and returns it.@mcp.tool() def list_kafka_topics(): logger.info("Listing all topics in the Kafka cluster") admin_client = AdminClient(json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG'])) topics = admin_client.list_topics(timeout=10).topics topics_array = [] for topic, detail in topics.items(): topic_info = {"topic": topic, "partitions": len(detail.partitions)} topics_array.append(topic_info) return topics_array
- mcp_timeplus/__init__.py:1-11 (registration)Registers the list_kafka_topics tool by importing it from mcp_server.py into the package __init__.py, making it available for MCP server initialization.from .mcp_server import ( create_timeplus_client, list_databases, list_tables, run_sql, list_kafka_topics, explore_kafka_topic, create_kafka_stream, generate_sql, connect_to_apache_iceberg, )
- mcp_timeplus/__init__.py:13-23 (registration)Explicitly lists 'list_kafka_topics' in __all__, ensuring it is exported when the package is imported, facilitating tool registration in the MCP server.__all__ = [ "list_databases", "list_tables", "run_sql", "create_timeplus_client", "list_kafka_topics", "explore_kafka_topic", "create_kafka_stream", "generate_sql", "connect_to_apache_iceberg", ]