Skip to main content
Glama
stereosky

Lenses MCP Server

by stereosky

list_kafka_connectors

Retrieve and filter Kafka connectors by environment, cluster, or class name to monitor and manage data integration pipelines.

Instructions

Retrieves a list of all Kafka connectors.

Args: environment: The environment name. cluster: Optional list of cluster names to filter by. class_name: Optional list of connector class names to filter by.

Returns: A dictionary containing a list of all connectors with their details.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
environmentYes
clusterNo
class_nameNo

Implementation Reference

  • The async handler function for the list_kafka_connectors MCP tool. It accepts environment and optional filters for cluster and class_name, builds a query string for the API endpoint, and performs a GET request via api_client to retrieve the list of Kafka connectors.
    @mcp.tool()
    async def list_kafka_connectors(
        environment: str,
        cluster: Optional[List[str]] = None,
        class_name: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """
        Retrieves a list of all Kafka connectors.
        
        Args:
            environment: The environment name.
            cluster: Optional list of cluster names to filter by.
            class_name: Optional list of connector class names to filter by.
        
        Returns:
            A dictionary containing a list of all connectors with their details.
        """
        params = {}
        if cluster:
            params["cluster"] = cluster
        if class_name:
            params["className"] = class_name
        
        # Build query string
        query_params = []
        for key, value in params.items():
            if isinstance(value, list):
                for item in value:
                    query_params.append(f"{key}={item}")
            else:
                query_params.append(f"{key}={value}")
        
        query_string = "&".join(query_params) if query_params else ""
        endpoint = f"/api/v1/environments/{environment}/proxy/api/kafka-connect/connectors"
        if query_string:
            endpoint += f"?{query_string}"
        
        return await api_client._make_request("GET", endpoint)
  • Invocation of register_kafka_connectors(mcp) in the main MCP server setup, which defines and registers the list_kafka_connectors tool using @mcp.tool() decorator.
    register_kafka_connectors(mcp)
  • Import of the register_kafka_connectors function used to register the Kafka connectors tools, including list_kafka_connectors.
    from tools.kafka_connectors import register_kafka_connectors

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stereosky/lenses-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server