read_rows
Query and retrieve data from CockroachDB tables using filters, column selection, sorting, and row limits to access specific database information.
Instructions
Read rows from a table.
Args:
table: Table name (schema.table or just table).
id_value: Primary key value for single row lookup.
id_column: Name of the ID column (default: 'id').
where: WHERE clause (without 'WHERE').
columns: List of columns to return (default: all).
order_by: ORDER BY clause (without 'ORDER BY').
limit: Maximum rows to return.
Returns:
Query results.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| table | Yes | ||
| id_value | No | ||
| id_column | No | id | |
| where | No | ||
| columns | No | ||
| order_by | No | ||
| limit | No |
Implementation Reference
- src/cockroachdb_mcp/server.py:226-254 (handler)MCP tool handler for 'read_rows'. Registers the tool with FastMCP and delegates execution to the crud module's read_rows function.@mcp.tool() async def read_rows( table: str, id_value: str | int | None = None, id_column: str = "id", where: str | None = None, columns: list[str] | None = None, order_by: str | None = None, limit: int | None = None, ) -> dict[str, Any]: """Read rows from a table. Args: table: Table name (schema.table or just table). id_value: Primary key value for single row lookup. id_column: Name of the ID column (default: 'id'). where: WHERE clause (without 'WHERE'). columns: List of columns to return (default: all). order_by: ORDER BY clause (without 'ORDER BY'). limit: Maximum rows to return. Returns: Query results. """ try: return await crud.read_rows(table, id_value, id_column, where, columns, order_by, limit) except Exception as e: return {"status": "error", "error": str(e)}
- Core implementation of read_rows. Validates inputs, constructs a dynamic SELECT query with optional WHERE, ORDER BY, LIMIT clauses, executes it using the connection manager, and returns formatted results.async def read_rows( table: str, id_value: str | int | None = None, id_column: str = "id", where: str | None = None, columns: list[str] | None = None, order_by: str | None = None, limit: int | None = None, ) -> dict[str, Any]: """Read rows from a table. Args: table: Table name (schema.table or just table). id_value: Primary key value for single row lookup. id_column: Name of the ID column (default: 'id'). where: WHERE clause (without 'WHERE'). columns: List of columns to return (default: all). order_by: ORDER BY clause (without 'ORDER BY'). limit: Maximum rows to return. Returns: Query results. """ # Validate table name valid, error = _validate_table_name(table) if not valid: return {"status": "error", "error": error} schema, table_name = _parse_table_name(table) # Build column list if columns: # Validate column names for col in columns: if not re.match(r"^[\w]+$", col): return {"status": "error", "error": f"Invalid column name: {col}"} col_list = ", ".join(columns) else: col_list = "*" # Build query query = f"SELECT {col_list} FROM {schema}.{table_name}" # Add WHERE clause params: list[Any] = [] if id_value is not None: query += f" WHERE {id_column} = %s" params.append(id_value) elif where: query += f" WHERE {where}" # Add ORDER BY if order_by: query += f" ORDER BY {order_by}" # Add LIMIT effective_limit = limit if limit is not None else settings.max_rows query += f" LIMIT {effective_limit}" conn = await connection_manager.ensure_connected() try: async with conn.cursor() as cur: if params: await cur.execute(query, tuple(params)) else: await cur.execute(query) rows = await cur.fetchall() columns_returned = [desc.name for desc in cur.description] if cur.description else [] return { "status": "success", "table": f"{schema}.{table_name}", "columns": columns_returned, "rows": rows, "row_count": len(rows), "limit": effective_limit, } except Exception as e: return {"status": "error", "error": str(e)}
- Helper function to validate the table name format (allows schema.table or table).def _validate_table_name(table: str) -> tuple[bool, str | None]: """Validate table name format. Args: table: Table name to validate. Returns: Tuple of (is_valid, error_message). """ # Allow schema.table format if not re.match(r"^[\w]+\.[\w]+$|^[\w]+$", table): return False, "Invalid table name format" return True, None
- Helper function to parse table name into schema and table parts, defaulting schema to 'public'.def _parse_table_name(table: str) -> tuple[str, str]: """Parse table name into schema and table. Args: table: Table name (schema.table or just table). Returns: Tuple of (schema, table_name). """ if "." in table: schema, table_name = table.rsplit(".", 1) else: schema = "public" table_name = table return schema, table_name