create_sql_processor
Create SQL processors to query and transform Kafka data streams across clusters using Lenses.io, with configurable deployment options.
Instructions
Creates a new SQL processor.
Args: environment: The environment name. name: The name of the SQL processor. sql: The SQL query/statement for the processor. deployment: Deployment configuration including details like mode, runners, cluster, namespace, etc. If there are no available deployment targets (Kubernetes or Connect clusters), use 'in process' mode: {{mode: "IN_PROC"}} sql_processor_id: Optional processor ID. If not provided, will be auto-generated. description: Optional description of the processor. tags: Optional list of tags for the processor.
Returns: The created SQL processor object with its ID.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment | Yes | ||
| name | Yes | ||
| sql | Yes | ||
| deployment | No | ||
| sql_processor_id | No | ||
| description | No | ||
| tags | No |
Implementation Reference
- The core handler function for the 'create_sql_processor' MCP tool. It constructs a payload from the input parameters and makes a POST request to the Lenses API to create a new SQL processor.@mcp.tool() async def create_sql_processor( environment: str, name: str, sql: str, deployment: Dict[str, Any] = None, sql_processor_id: Optional[str] = None, description: Optional[str] = None, tags: Optional[List[str]] = None ) -> Dict[str, Any]: """ Creates a new SQL processor. Args: environment: The environment name. name: The name of the SQL processor. sql: The SQL query/statement for the processor. deployment: Deployment configuration including details like mode, runners, cluster, namespace, etc. If there are no available deployment targets (Kubernetes or Connect clusters), use 'in process' mode: {{mode: "IN_PROC"}} sql_processor_id: Optional processor ID. If not provided, will be auto-generated. description: Optional description of the processor. tags: Optional list of tags for the processor. Returns: The created SQL processor object with its ID. """ payload = { "name": name, "sql": sql } if sql_processor_id: payload["processorId"] = sql_processor_id if description: payload["description"] = description if deployment: payload["deployment"] = deployment if tags: payload["tags"] = tags endpoint = f"/api/v1/environments/{environment}/proxy/api/v2/streams" try: return await api_client._make_request("POST", endpoint, payload) except Exception as e: raise ToolError(f"SQL processor creation failed: {e}")
- src/lenses_mcp/server.py:32-32 (registration)Top-level call to register all SQL processor tools, including 'create_sql_processor', on the MCP server instance.register_sql_processors(mcp)
- src/lenses_mcp/tools/sql_processors.py:10-10 (registration)Function that defines and registers the SQL processor tools using @mcp.tool() decorators, including the 'create_sql_processor' tool.def register_sql_processors(mcp: FastMCP):
- Helper prompt generator for guiding the creation of SQL processors using the create_sql_processor tool.@mcp.prompt() def generate_create_sql_processor_prompt(name: str, sql: str, environment: str) -> str: """Create a SQL processor with the specified name and SQL query""" return f""" Please create a SQL processor named '{name}' in the '{environment}' environment with the following SQL query: {sql} The processor should be configured with appropriate deployment settings. Here is an example 'deployment' for Community Edition, which uses a local 'in process' mode: {{mode: "IN_PROC"}} It should be used when there are no available deployment targets (Kubernetes or Connect clusters) in the environment. Here is an example 'deployment' for Kubenetes: {{mode: "KUBERNETES", details: {{runners: 1, cluster: "incluster", namespace: "ai-agent"}}}} The settings can be determined for 'cluster' and 'namespace' with the get_deployment_targets tool call. """