get_execution_plan
Analyze SQL query performance by retrieving execution plans with runtime statistics from Amazon Redshift databases to identify optimization opportunities.
Instructions
Get actual execution plan with runtime statistics for a SQL query
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes | The SQL query to analyze |
Implementation Reference
- Specific dispatch handler for the 'get_execution_plan' tool within the call_tool function. It extracts the 'sql' argument, validates its presence (note: minor typo in error message), and modifies the SQL by prefixing 'EXPLAIN ' to obtain the execution plan.elif name == "get_execution_plan": sql = args.get("sql") if not sql: raise ValueError("sql parameter is required when calling get_query_plan tool") sql = f"EXPLAIN {sql}"
- Input schema definition for the tool, specifying an object with a required 'sql' property of type string.inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to analyze" } }, "required": ["sql"] }
- src/redshift_mcp_server/server.py:157-170 (registration)Registration of the 'get_execution_plan' tool in the @server.list_tools() callback, defining its metadata and input schema.Tool( name="get_execution_plan", description="Get actual execution plan with runtime statistics for a SQL query", inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to analyze" } }, "required": ["sql"] } )
- Shared execution helper used by all tools (including get_execution_plan): establishes Redshift connection using config, executes the prepared SQL, handles special cases, fetches and formats query results (columns + rows as CSV) into TextContent for MCP response, with error handling and connection cleanup.try: conn = redshift_connector.connect( host=config['host'], port=int(config['port']), user=config['user'], password=config['password'], database=config['database'], ) conn.autocommit = True with conn.cursor() as cursor: cursor.execute(sql) if name == "analyze_table": return [TextContent(type="text", text=f"Successfully analyzed table {schema}.{table}")] if cursor.description is None: return [TextContent(type="text", text=f"Successfully execute sql {sql}")] columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() result = [",".join(map(str, row)) for row in rows] return [TextContent(type="text", text="\n".join([",".join(columns)] + result ))] except Exception as e: return [TextContent(type="text", text=f"Error executing query: {str(e)}")] finally: conn.close()