update_row
Modify existing database records by specifying primary key values and new column data to update rows in CockroachDB tables.
Instructions
Update an existing row by primary key.
Args:
table: Table name (schema.table or just table).
id_value: Primary key value.
data: Column names and new values.
id_column: Name of the ID column (default: 'id').
returning: Columns to return from updated row.
Returns:
Update result.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| table | Yes | ||
| id_value | Yes | ||
| data | Yes | ||
| id_column | No | id | |
| returning | No |
Implementation Reference
- src/cockroachdb_mcp/server.py:278-301 (handler)MCP tool handler for 'update_row': decorated with @mcp.tool(), handles input parameters, delegates execution to crud.update_row, and wraps errors.@mcp.tool() async def update_row( table: str, id_value: str | int, data: dict[str, Any], id_column: str = "id", returning: list[str] | None = None, ) -> dict[str, Any]: """Update an existing row by primary key. Args: table: Table name (schema.table or just table). id_value: Primary key value. data: Column names and new values. id_column: Name of the ID column (default: 'id'). returning: Columns to return from updated row. Returns: Update result. """ try: return await crud.update_row(table, id_value, data, id_column, returning) except Exception as e: return {"status": "error", "error": str(e)}
- Core helper function implementing the update_row logic: validates inputs, constructs parameterized UPDATE SQL query, executes it, and formats the response with status, table info, and results.async def update_row( table: str, id_value: str | int, data: dict[str, Any], id_column: str = "id", returning: list[str] | None = None, ) -> dict[str, Any]: """Update an existing row by primary key. Args: table: Table name (schema.table or just table). id_value: Primary key value. data: Column names and new values. id_column: Name of the ID column (default: 'id'). returning: Columns to return from updated row. Returns: Update result. """ # Check read-only mode if settings.read_only: return {"status": "error", "error": "Server is in read-only mode"} # Validate table name valid, error = _validate_table_name(table) if not valid: return {"status": "error", "error": error} if not data: return {"status": "error", "error": "No data provided"} schema, table_name = _parse_table_name(table) # Validate column names for col in data.keys(): if not re.match(r"^[\w]+$", col): return {"status": "error", "error": f"Invalid column name: {col}"} # Build UPDATE query set_clauses = [f"{col} = %s" for col in data.keys()] set_clause = ", ".join(set_clauses) values = list(data.values()) values.append(id_value) query = f"UPDATE {schema}.{table_name} SET {set_clause} WHERE {id_column} = %s" # Add RETURNING clause if returning: for col in returning: if not re.match(r"^[\w]+$", col): return {"status": "error", "error": f"Invalid column name in returning: {col}"} query += f" RETURNING {', '.join(returning)}" conn = await connection_manager.ensure_connected() try: async with conn.cursor() as cur: await cur.execute(query, tuple(values)) if returning: row = await cur.fetchone() return { "status": "success", "table": f"{schema}.{table_name}", "action": "updated", "id": id_value, "returning": row, } else: return { "status": "success", "table": f"{schema}.{table_name}", "action": "updated", "id": id_value, "rows_affected": cur.rowcount, } except Exception as e: return {"status": "error", "error": str(e)}