Skip to main content
Glama
bpamiri

CockroachDB MCP Server

by bpamiri

execute_query

Execute SQL queries on CockroachDB clusters to retrieve data, perform CRUD operations, and monitor database performance with configurable safety controls.

Instructions

Execute a SQL query.

Args:
    sql: SQL statement to execute.
    max_rows: Maximum rows to return (default: from config).

Returns:
    Query results with columns and rows.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sqlYes
max_rowsNo

Implementation Reference

  • MCP tool registration via @mcp.tool() decorator for the 'execute_query' tool. This is the primary handler invoked by the MCP server, which delegates to the query module.
    @mcp.tool()
    async def execute_query(sql: str, max_rows: int | None = None) -> dict[str, Any]:
        """Execute a SQL query.
    
        Args:
            sql: SQL statement to execute.
            max_rows: Maximum rows to return (default: from config).
    
        Returns:
            Query results with columns and rows.
        """
        try:
            return await query.execute_query(sql, max_rows)
        except Exception as e:
            return {"status": "error", "error": str(e)}
  • Main handler logic for executing queries: performs validation (blocked commands, read-only mode) before calling the connection manager.
    async def execute_query(
        query: str,
        max_rows: int | None = None,
    ) -> dict[str, Any]:
        """Execute a SQL query.
    
        Args:
            query: SQL query to execute.
            max_rows: Maximum rows to return.
    
        Returns:
            Query results.
        """
        # Validate first
        validation = await validate_query(query)
        if not validation["is_valid"]:
            return {
                "status": "error",
                "error": "Query validation failed",
                "issues": validation["issues"],
            }
    
        return await connection_manager.execute_query(query, max_rows=max_rows)
  • Low-level helper that executes the SQL query using psycopg AsyncConnection, handles SELECT/non-SELECT, row limits, and formatting results.
    async def execute_query(
        self,
        query: str,
        params: tuple[Any, ...] | None = None,
        max_rows: int | None = None,
    ) -> dict[str, Any]:
        """Execute a query and return results.
    
        Args:
            query: SQL query to execute.
            params: Query parameters.
            max_rows: Maximum rows to return.
    
        Returns:
            Query results.
        """
        conn = await self.ensure_connected()
    
        effective_max_rows = max_rows if max_rows is not None else settings.max_rows
    
        try:
            async with conn.cursor() as cur:
                if params:
                    await cur.execute(query, params)
                else:
                    await cur.execute(query)
    
                # Check if query returns results
                if cur.description is None:
                    # Non-SELECT query (INSERT, UPDATE, DELETE, etc.)
                    return {
                        "status": "success",
                        "rows_affected": cur.rowcount,
                        "message": f"{cur.rowcount} row(s) affected",
                    }
    
                # Fetch results with limit
                rows = await cur.fetchmany(effective_max_rows)
                total_fetched = len(rows)
    
                # Check if there are more rows
                has_more = False
                if total_fetched == effective_max_rows:
                    extra = await cur.fetchone()
                    has_more = extra is not None
    
                # Get column names
                columns = [desc.name for desc in cur.description]
    
                return {
                    "status": "success",
                    "columns": columns,
                    "rows": rows,
                    "row_count": total_fetched,
                    "has_more": has_more,
                    "max_rows": effective_max_rows,
                }
        except Exception as e:
            return {"status": "error", "error": str(e)}
  • Input validation schema/logic: checks for blocked commands, read-only mode compatibility, determines query type, and provides validation feedback.
    async def validate_query(query: str) -> dict[str, Any]:
        """Validate a SQL query without executing it.
    
        Args:
            query: SQL query to validate.
    
        Returns:
            Validation result with is_valid and any issues.
        """
        issues: list[str] = []
    
        # Check for empty query
        if not query or not query.strip():
            return {
                "is_valid": False,
                "issues": ["Query is empty"],
                "query_type": None,
            }
    
        # Check for blocked commands
        is_blocked, blocked_cmd = _is_blocked_command(query)
        if is_blocked:
            issues.append(f"Blocked command: {blocked_cmd}")
    
        # Check read-only mode
        if settings.read_only and not _is_read_only_query(query):
            issues.append("Server is in read-only mode; only SELECT/SHOW/EXPLAIN allowed")
    
        # Determine query type
        query_upper = query.strip().upper()
        if query_upper.startswith("SELECT") or query_upper.startswith("WITH"):
            query_type = "SELECT"
        elif query_upper.startswith("INSERT"):
            query_type = "INSERT"
        elif query_upper.startswith("UPDATE"):
            query_type = "UPDATE"
        elif query_upper.startswith("DELETE"):
            query_type = "DELETE"
        elif query_upper.startswith("SHOW"):
            query_type = "SHOW"
        elif query_upper.startswith("EXPLAIN"):
            query_type = "EXPLAIN"
        else:
            query_type = "OTHER"
    
        return {
            "is_valid": len(issues) == 0,
            "issues": issues,
            "query_type": query_type,
            "is_read_only": _is_read_only_query(query),
        }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bpamiri/cockroachdb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server