execute_sql
Execute SQL queries on Amazon Redshift databases to retrieve, analyze, or modify data directly from AI assistants.
Instructions
Execute a SQL Query on the Redshift cluster
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes | The SQL to Execute |
Implementation Reference
- The @server.call_tool() decorator registers this as the tool handler. It dispatches based on the tool name. For 'execute_sql', it retrieves the 'sql' parameter from args, validates it, connects to Redshift using config from env vars, executes the SQL, fetches results if query returns columns, formats as CSV-like text, and returns as TextContent. Handles errors and closes connection.@server.call_tool() async def call_tool(name: str, args: dict) -> list[TextContent]: """Execute SQL""" config=get_redshift_config() sql = '' if name == "execute_sql": sql = args.get("sql") if not sql: raise ValueError("sql parameter is required when calling execute_sql tool") elif name == "analyze_table": schema = args.get("schema") table = args.get("table") if not all([schema, table]): raise ValueError("'schema' and 'table' parameters are required when calling analyze_table tool") sql = f"ANALYZE {schema}.{table}" elif name == "get_execution_plan": sql = args.get("sql") if not sql: raise ValueError("sql parameter is required when calling get_query_plan tool") sql = f"EXPLAIN {sql}" try: conn = redshift_connector.connect( host=config['host'], port=int(config['port']), user=config['user'], password=config['password'], database=config['database'], ) conn.autocommit = True with conn.cursor() as cursor: cursor.execute(sql) if name == "analyze_table": return [TextContent(type="text", text=f"Successfully analyzed table {schema}.{table}")] if cursor.description is None: return [TextContent(type="text", text=f"Successfully execute sql {sql}")] columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() result = [",".join(map(str, row)) for row in rows] return [TextContent(type="text", text="\n".join([",".join(columns)] + result ))] except Exception as e: return [TextContent(type="text", text=f"Error executing query: {str(e)}")] finally: conn.close()
- Pydantic-like JSON schema for 'execute_sql' tool input, defining an object with a required 'sql' string property.inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL to Execute" } }, "required": ["sql"] }
- src/redshift_mcp_server/server.py:125-138 (registration)The Tool object registration for 'execute_sql' returned by @server.list_tools(), including name, description, and input schema.Tool( name="execute_sql", description="Execute a SQL Query on the Redshift cluster", inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL to Execute" } }, "required": ["sql"] } ),