read_query
Execute SELECT queries to retrieve data from Snowflake databases. This tool processes SQL queries for data extraction and analysis.
Instructions
Execute a SELECT query.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | SELECT SQL query to execute |
Implementation Reference
- The handler function that implements the core logic for the 'read_query' tool: validates input, checks for write operations, executes the SQL query, and formats results as YAML and JSON.async def handle_read_query(arguments, db, write_detector, *_, exclude_json_results=False): if not arguments or "query" not in arguments: raise ValueError("Missing query argument") if write_detector.analyze_query(arguments["query"])["contains_write"]: raise ValueError("Calls to read_query should not contain write operations") data, data_id = await db.execute_query(arguments["query"]) output = { "type": "data", "data_id": data_id, "data": data, } yaml_output = to_yaml(output) json_output = to_json(output) results: list[ResponseType] = [types.TextContent(type="text", text=yaml_output)] if not exclude_json_results: results.append( types.EmbeddedResource( type="resource", resource=types.TextResourceContents( uri=f"data://{data_id}", text=json_output, mimeType="application/json" ), ) ) return results
- src/mcp_snowflake_server/server.py:435-444 (registration)The Tool object registration for 'read_query', specifying its name, description, input schema, and linking to the handler function.Tool( name="read_query", description="Execute a SELECT query.", input_schema={ "type": "object", "properties": {"query": {"type": "string", "description": "SELECT SQL query to execute"}}, "required": ["query"], }, handler=handle_read_query, ),
- The input schema definition for the 'read_query' tool, defining the required 'query' parameter.input_schema={ "type": "object", "properties": {"query": {"type": "string", "description": "SELECT SQL query to execute"}}, "required": ["query"], },