execute_sql
Run SQL queries on Amazon Redshift clusters via the Redshift MCP Server to explore schemas, execute queries, and collect statistics efficiently.
Instructions
Execute a SQL Query on the Redshift cluster
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes | The SQL to Execute |
Implementation Reference
- src/redshift_mcp_server/server.py:125-139 (registration)Registers the 'execute_sql' tool with the MCP server in the list_tools() function, including its description and input schema that requires a 'sql' parameter.Tool( name="execute_sql", description="Execute a SQL Query on the Redshift cluster", inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL to Execute" } }, "required": ["sql"] } ), Tool(
- The call_tool handler dispatches based on tool name. For 'execute_sql', it retrieves and validates the 'sql' argument, connects to Redshift, executes the SQL query, handles results or updates, and returns formatted TextContent with query results or status messages.async def call_tool(name: str, args: dict) -> list[TextContent]: """Execute SQL""" config=get_redshift_config() sql = '' if name == "execute_sql": sql = args.get("sql") if not sql: raise ValueError("sql parameter is required when calling execute_sql tool") elif name == "analyze_table": schema = args.get("schema") table = args.get("table") if not all([schema, table]): raise ValueError("'schema' and 'table' parameters are required when calling analyze_table tool") sql = f"ANALYZE {schema}.{table}" elif name == "get_execution_plan": sql = args.get("sql") if not sql: raise ValueError("sql parameter is required when calling get_query_plan tool") sql = f"EXPLAIN {sql}" try: conn = redshift_connector.connect( host=config['host'], port=int(config['port']), user=config['user'], password=config['password'], database=config['database'], ) conn.autocommit = True with conn.cursor() as cursor: cursor.execute(sql) if name == "analyze_table": return [TextContent(type="text", text=f"Successfully analyzed table {schema}.{table}")] if cursor.description is None: return [TextContent(type="text", text=f"Successfully execute sql {sql}")] columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() result = [",".join(map(str, row)) for row in rows] return [TextContent(type="text", text="\n".join([",".join(columns)] + result ))] except Exception as e: return [TextContent(type="text", text=f"Error executing query: {str(e)}")] finally: conn.close()