Skip to main content
Glama
stereosky

Lenses MCP Server

by stereosky

create_sql_processor

Create SQL processors to query and transform Kafka data streams across clusters using Lenses.io, with configurable deployment options.

Instructions

Creates a new SQL processor.

Args: environment: The environment name. name: The name of the SQL processor. sql: The SQL query/statement for the processor. deployment: Deployment configuration including details like mode, runners, cluster, namespace, etc. If there are no available deployment targets (Kubernetes or Connect clusters), use 'in process' mode: {{mode: "IN_PROC"}} sql_processor_id: Optional processor ID. If not provided, will be auto-generated. description: Optional description of the processor. tags: Optional list of tags for the processor.

Returns: The created SQL processor object with its ID.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
environmentYes
nameYes
sqlYes
deploymentNo
sql_processor_idNo
descriptionNo
tagsNo

Implementation Reference

  • The core handler function for the 'create_sql_processor' MCP tool. It constructs a payload from the input parameters and makes a POST request to the Lenses API to create a new SQL processor.
    @mcp.tool()
    async def create_sql_processor(
        environment: str,
        name: str,
        sql: str,
        deployment: Dict[str, Any] = None,
        sql_processor_id: Optional[str] = None,
        description: Optional[str] = None,
        tags: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """
        Creates a new SQL processor.
        
        Args:
            environment: The environment name.
            name: The name of the SQL processor.
            sql: The SQL query/statement for the processor.
            deployment: Deployment configuration including details like mode, runners, cluster, namespace, etc.
                If there are no available deployment targets (Kubernetes or Connect clusters), use 'in process' mode: {{mode: "IN_PROC"}}
            sql_processor_id: Optional processor ID. If not provided, will be auto-generated.
            description: Optional description of the processor.
            tags: Optional list of tags for the processor.
        
        Returns:
            The created SQL processor object with its ID.
        """
        payload = {
            "name": name,
            "sql": sql
        }
        
        if sql_processor_id:
            payload["processorId"] = sql_processor_id
        if description:
            payload["description"] = description
        if deployment:
            payload["deployment"] = deployment
        if tags:
            payload["tags"] = tags
        
        endpoint = f"/api/v1/environments/{environment}/proxy/api/v2/streams"
    
        try:
            return await api_client._make_request("POST", endpoint, payload)
        except Exception as e:
            raise ToolError(f"SQL processor creation failed: {e}")
  • Top-level call to register all SQL processor tools, including 'create_sql_processor', on the MCP server instance.
    register_sql_processors(mcp)
  • Function that defines and registers the SQL processor tools using @mcp.tool() decorators, including the 'create_sql_processor' tool.
    def register_sql_processors(mcp: FastMCP):
  • Helper prompt generator for guiding the creation of SQL processors using the create_sql_processor tool.
    @mcp.prompt()
    def generate_create_sql_processor_prompt(name: str, sql: str, environment: str) -> str:
        """Create a SQL processor with the specified name and SQL query"""
        return f"""
            Please create a SQL processor named '{name}' in the '{environment}' environment
            with the following SQL query:
            
            {sql}
            
            The processor should be configured with appropriate deployment settings.
            Here is an example 'deployment' for Community Edition, which uses a local 'in process' mode: {{mode: "IN_PROC"}}
            It should be used when there are no available deployment targets (Kubernetes or Connect clusters) in the environment.
            Here is an example 'deployment' for Kubenetes: {{mode: "KUBERNETES", details: {{runners: 1, cluster: "incluster", namespace: "ai-agent"}}}}
            The settings can be determined for 'cluster' and 'namespace' with the get_deployment_targets tool call.
            """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stereosky/lenses-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server