query
Execute SQL SELECT queries to retrieve data from PostgreSQL databases. This read-only tool returns query results with rows, columns, and metadata for database analysis.
Instructions
Execute a SQL query against the PostgreSQL database.
This tool is READ-ONLY by default. Use the 'execute' tool for write operations.
Args:
sql: SQL query to execute (SELECT statements only)
Returns:
Query results with rows, columns, and metadata
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes |
Implementation Reference
- postgres_mcp/server.py:48-72 (handler)The handler function for the 'query' MCP tool. It executes read-only SQL queries via PostgresClient.execute_query, returning results as a dictionary with rows, count, columns, and truncation status. The @mcp.tool() decorator registers it as the 'query' tool.@mcp.tool() @handle_db_error def query(sql: str) -> dict: """Execute a SQL query against the PostgreSQL database. This tool is READ-ONLY by default. Use the 'execute' tool for write operations. Args: sql: SQL query to execute (SELECT statements only) Returns: Query results with rows, columns, and metadata """ client = get_client() settings = get_settings() result = client.execute_query(sql, allow_write=False, max_rows=settings.max_rows) return { "rows": result["rows"], "row_count": result["row_count"], "columns": result["columns"], "truncated": result.get("truncated", False), }
- Core helper method in PostgresClient that performs the actual query execution, validation, and result fetching. Called by the 'query' tool handler.def execute_query( self, query: str, params: Optional[tuple] = None, allow_write: bool = False, max_rows: Optional[int] = None, ) -> dict[str, Any]: """Execute a SQL query. Args: query: SQL query string params: Optional query parameters allow_write: Whether to allow write operations max_rows: Maximum rows to return (None uses settings default) Returns: Dict with results, row_count, columns """ # Validate query validated_query = validate_query(query, allow_write=allow_write) max_rows = max_rows or self.settings.max_rows with self.get_connection() as conn: cursor = conn.cursor() try: cursor.execute(validated_query, params) # Check if it's a SELECT query is_select = validated_query.strip().upper().startswith("SELECT") if is_select: rows = cursor.fetchmany(max_rows + 1) truncated = len(rows) > max_rows if truncated: rows = rows[:max_rows] columns = [desc[0] for desc in cursor.description] if cursor.description else [] return { "success": True, "rows": [dict(row) for row in rows], "row_count": len(rows), "columns": columns, "truncated": truncated, } else: conn.commit() return { "success": True, "rows": [], "row_count": cursor.rowcount, "columns": [], "message": f"{cursor.rowcount} rows affected", } except psycopg2.Error as e: conn.rollback() raise PostgresClientError(f"Query failed: {e}") from e finally: cursor.close()