Skip to main content
Glama
Moonlight-CL

Redshift MCP Server

by Moonlight-CL

execute_sql

Execute SQL queries on Amazon Redshift databases to retrieve, analyze, or modify data directly from AI assistants.

Instructions

Execute a SQL Query on the Redshift cluster

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sqlYesThe SQL to Execute

Implementation Reference

  • The @server.call_tool() decorator registers this as the tool handler. It dispatches based on the tool name. For 'execute_sql', it retrieves the 'sql' parameter from args, validates it, connects to Redshift using config from env vars, executes the SQL, fetches results if query returns columns, formats as CSV-like text, and returns as TextContent. Handles errors and closes connection.
    @server.call_tool()
    async def call_tool(name: str, args: dict) -> list[TextContent]:
        """Execute SQL"""
        config=get_redshift_config()
        sql = ''
    
        if name == "execute_sql":
            sql = args.get("sql")
            if not sql:
                raise ValueError("sql parameter is required when calling execute_sql tool")
        elif name == "analyze_table":
            schema = args.get("schema")
            table = args.get("table")
            if not all([schema, table]):
                raise ValueError("'schema' and 'table' parameters are required when calling analyze_table tool")
            sql = f"ANALYZE {schema}.{table}"
        elif name == "get_execution_plan":
            sql = args.get("sql")
            if not sql:
                raise ValueError("sql parameter is required when calling get_query_plan tool")
            sql = f"EXPLAIN {sql}"
    
        try:
            conn = redshift_connector.connect(
                host=config['host'],
                port=int(config['port']),
                user=config['user'],
                password=config['password'],
                database=config['database'],
            )
            conn.autocommit = True
    
            with conn.cursor() as cursor:
                cursor.execute(sql)
                if name == "analyze_table":
                    return [TextContent(type="text", text=f"Successfully analyzed table {schema}.{table}")]
    
                if cursor.description is None:
                    return [TextContent(type="text", text=f"Successfully execute sql {sql}")]
    
                columns = [desc[0] for desc in cursor.description]
                rows = cursor.fetchall()
                result = [",".join(map(str, row)) for row in rows]
                return [TextContent(type="text", text="\n".join([",".join(columns)] +  result ))]
        except Exception as e:
            return [TextContent(type="text", text=f"Error executing query: {str(e)}")]
        finally:
            conn.close()
  • Pydantic-like JSON schema for 'execute_sql' tool input, defining an object with a required 'sql' string property.
    inputSchema={
        "type": "object",
        "properties": {
            "sql": {
                "type": "string",
                "description": "The SQL to Execute"
            }
        },
        "required": ["sql"]
    }
  • The Tool object registration for 'execute_sql' returned by @server.list_tools(), including name, description, and input schema.
    Tool(
        name="execute_sql",
        description="Execute a SQL Query on the Redshift cluster",
        inputSchema={
            "type": "object",
            "properties": {
                "sql": {
                    "type": "string",
                    "description": "The SQL to Execute"
                }
            },
            "required": ["sql"]
        }
    ),
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Moonlight-CL/redshift-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server