Skip to main content
Glama
bpamiri

CockroachDB MCP Server

by bpamiri

execute_query

Execute SQL queries on CockroachDB clusters to retrieve, analyze, or modify data with configurable row limits.

Instructions

Execute a SQL query.

Args: sql: SQL statement to execute. max_rows: Maximum rows to return (default: from config). Returns: Query results with columns and rows.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sqlYes
max_rowsNo

Implementation Reference

  • MCP tool registration for 'execute_query' using @mcp.tool(). Thin wrapper that delegates to the query module's execute_query.
    @mcp.tool() async def execute_query(sql: str, max_rows: int | None = None) -> dict[str, Any]: """Execute a SQL query. Args: sql: SQL statement to execute. max_rows: Maximum rows to return (default: from config). Returns: Query results with columns and rows. """ try: return await query.execute_query(sql, max_rows) except Exception as e: return {"status": "error", "error": str(e)}
  • Primary handler logic for execute_query: validates the SQL query for safety and blocked commands, then delegates to connection_manager for execution.
    async def execute_query( query: str, max_rows: int | None = None, ) -> dict[str, Any]: """Execute a SQL query. Args: query: SQL query to execute. max_rows: Maximum rows to return. Returns: Query results. """ # Validate first validation = await validate_query(query) if not validation["is_valid"]: return { "status": "error", "error": "Query validation failed", "issues": validation["issues"], } return await connection_manager.execute_query(query, max_rows=max_rows)
  • Input validation helper for execute_query: checks for blocked commands, read-only mode compliance, and determines query type.
    async def validate_query(query: str) -> dict[str, Any]: """Validate a SQL query without executing it. Args: query: SQL query to validate. Returns: Validation result with is_valid and any issues. """ issues: list[str] = [] # Check for empty query if not query or not query.strip(): return { "is_valid": False, "issues": ["Query is empty"], "query_type": None, } # Check for blocked commands is_blocked, blocked_cmd = _is_blocked_command(query) if is_blocked: issues.append(f"Blocked command: {blocked_cmd}") # Check read-only mode if settings.read_only and not _is_read_only_query(query): issues.append("Server is in read-only mode; only SELECT/SHOW/EXPLAIN allowed") # Determine query type query_upper = query.strip().upper() if query_upper.startswith("SELECT") or query_upper.startswith("WITH"): query_type = "SELECT" elif query_upper.startswith("INSERT"): query_type = "INSERT" elif query_upper.startswith("UPDATE"): query_type = "UPDATE" elif query_upper.startswith("DELETE"): query_type = "DELETE" elif query_upper.startswith("SHOW"): query_type = "SHOW" elif query_upper.startswith("EXPLAIN"): query_type = "EXPLAIN" else: query_type = "OTHER" return { "is_valid": len(issues) == 0, "issues": issues, "query_type": query_type, "is_read_only": _is_read_only_query(query), }
  • Low-level query execution helper in ConnectionManager: handles actual database interaction, row fetching, and result formatting.
    async def execute_query( self, query: str, params: tuple[Any, ...] | None = None, max_rows: int | None = None, ) -> dict[str, Any]: """Execute a query and return results. Args: query: SQL query to execute. params: Query parameters. max_rows: Maximum rows to return. Returns: Query results. """ conn = await self.ensure_connected() effective_max_rows = max_rows if max_rows is not None else settings.max_rows try: async with conn.cursor() as cur: if params: await cur.execute(query, params) else: await cur.execute(query) # Check if query returns results if cur.description is None: # Non-SELECT query (INSERT, UPDATE, DELETE, etc.) return { "status": "success", "rows_affected": cur.rowcount, "message": f"{cur.rowcount} row(s) affected", } # Fetch results with limit rows = await cur.fetchmany(effective_max_rows) total_fetched = len(rows) # Check if there are more rows has_more = False if total_fetched == effective_max_rows: extra = await cur.fetchone() has_more = extra is not None # Get column names columns = [desc.name for desc in cur.description] return { "status": "success", "columns": columns, "rows": rows, "row_count": total_fetched, "has_more": has_more, "max_rows": effective_max_rows, } except Exception as e: return {"status": "error", "error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bpamiri/cockroachdb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server