list_kafka_topics
List Kafka topics available through the Timeplus integration for streaming data.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- mcp_timeplus/mcp_server.py:171-180 (handler)The handler function for the 'list_kafka_topics' tool. Uses confluent_kafka AdminClient to list all topics in the Kafka cluster and returns topic names with partition counts.
@mcp.tool() def list_kafka_topics(): logger.info("Listing all topics in the Kafka cluster") admin_client = AdminClient(json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG'])) topics = admin_client.list_topics(timeout=10).topics topics_array = [] for topic, detail in topics.items(): topic_info = {"topic": topic, "partitions": len(detail.partitions)} topics_array.append(topic_info) return topics_array - mcp_timeplus/mcp_server.py:171-171 (registration)The tool is registered as an MCP tool via the @mcp.tool() decorator on line 171.
@mcp.tool() - mcp_timeplus/mcp_server.py:171-171 (schema)The function has no parameters - this tool takes no input arguments. Its return type is a list of dicts with 'topic' and 'partitions' keys.
@mcp.tool() - mcp_timeplus/mcp_env.py:1-13 (helper)The TIMEPLUS_KAFKA_CONFIG environment variable (used by the handler) is not defined in mcp_env.py but is expected to be set externally. This config class handles other Timeplus env vars.
"""Environment configuration for the MCP Timeplus server. This module handles all environment variable configuration with sensible defaults and type conversion. """ from dataclasses import dataclass import os from typing import Optional @dataclass class TimeplusConfig: - mcp_timeplus/__init__.py:1-7 (helper)The function is re-exported from the package's __init__.py, making it part of the public API.
from .mcp_server import ( create_timeplus_client, list_databases, list_tables, run_sql, list_kafka_topics, explore_kafka_topic,