list_kafka_connectors
Retrieve and filter Kafka connectors by environment, cluster, or class name to monitor and manage data integration pipelines.
Instructions
Retrieves a list of all Kafka connectors.
Args: environment: The environment name. cluster: Optional list of cluster names to filter by. class_name: Optional list of connector class names to filter by.
Returns: A dictionary containing a list of all connectors with their details.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment | Yes | ||
| cluster | No | ||
| class_name | No |
Implementation Reference
- The async handler function for the list_kafka_connectors MCP tool. It accepts environment and optional filters for cluster and class_name, builds a query string for the API endpoint, and performs a GET request via api_client to retrieve the list of Kafka connectors.@mcp.tool() async def list_kafka_connectors( environment: str, cluster: Optional[List[str]] = None, class_name: Optional[List[str]] = None ) -> Dict[str, Any]: """ Retrieves a list of all Kafka connectors. Args: environment: The environment name. cluster: Optional list of cluster names to filter by. class_name: Optional list of connector class names to filter by. Returns: A dictionary containing a list of all connectors with their details. """ params = {} if cluster: params["cluster"] = cluster if class_name: params["className"] = class_name # Build query string query_params = [] for key, value in params.items(): if isinstance(value, list): for item in value: query_params.append(f"{key}={item}") else: query_params.append(f"{key}={value}") query_string = "&".join(query_params) if query_params else "" endpoint = f"/api/v1/environments/{environment}/proxy/api/kafka-connect/connectors" if query_string: endpoint += f"?{query_string}" return await api_client._make_request("GET", endpoint)
- src/lenses_mcp/server.py:29-29 (registration)Invocation of register_kafka_connectors(mcp) in the main MCP server setup, which defines and registers the list_kafka_connectors tool using @mcp.tool() decorator.register_kafka_connectors(mcp)
- src/lenses_mcp/server.py:13-13 (registration)Import of the register_kafka_connectors function used to register the Kafka connectors tools, including list_kafka_connectors.from tools.kafka_connectors import register_kafka_connectors