upsert_row
Insert new rows or update existing ones in a CockroachDB table based on conflict detection, handling data modifications with a single operation.
Instructions
Insert or update a row (UPSERT).
Args:
table: Table name (schema.table or just table).
data: Column names and values.
conflict_columns: Columns to check for conflicts (usually primary key).
update_columns: Columns to update on conflict (default: all except conflict columns).
returning: Columns to return.
Returns:
Upsert result.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| table | Yes | ||
| data | Yes | ||
| conflict_columns | Yes | ||
| update_columns | No | ||
| returning | No |
Implementation Reference
- Core implementation of the upsert_row tool: validates inputs, constructs CockroachDB UPSERT SQL query (INSERT ON CONFLICT DO UPDATE), executes it, and returns success/error with optional returning columns.async def upsert_row( table: str, data: dict[str, Any], conflict_columns: list[str], update_columns: list[str] | None = None, returning: list[str] | None = None, ) -> dict[str, Any]: """Insert or update a row (UPSERT). Args: table: Table name (schema.table or just table). data: Column names and values. conflict_columns: Columns to check for conflicts (usually primary key). update_columns: Columns to update on conflict (default: all except conflict columns). returning: Columns to return. Returns: Upsert result. """ # Check read-only mode if settings.read_only: return {"status": "error", "error": "Server is in read-only mode"} # Validate table name valid, error = _validate_table_name(table) if not valid: return {"status": "error", "error": error} if not data: return {"status": "error", "error": "No data provided"} if not conflict_columns: return {"status": "error", "error": "No conflict columns specified"} schema, table_name = _parse_table_name(table) # Validate all column names all_columns = list(data.keys()) + conflict_columns + (update_columns or []) + (returning or []) for col in all_columns: if not re.match(r"^[\w]+$", col): return {"status": "error", "error": f"Invalid column name: {col}"} # Determine columns to update if update_columns is None: update_columns = [c for c in data.keys() if c not in conflict_columns] if not update_columns: return {"status": "error", "error": "No columns to update on conflict"} # Build UPSERT query columns = list(data.keys()) placeholders = ", ".join(["%s"] * len(columns)) col_list = ", ".join(columns) values = list(data.values()) conflict_list = ", ".join(conflict_columns) update_set = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns]) query = f""" INSERT INTO {schema}.{table_name} ({col_list}) VALUES ({placeholders}) ON CONFLICT ({conflict_list}) DO UPDATE SET {update_set} """ # Add RETURNING clause if returning: query += f" RETURNING {', '.join(returning)}" conn = await connection_manager.ensure_connected() try: async with conn.cursor() as cur: await cur.execute(query, tuple(values)) if returning: row = await cur.fetchone() return { "status": "success", "table": f"{schema}.{table_name}", "action": "upserted", "returning": row, } else: return { "status": "success", "table": f"{schema}.{table_name}", "action": "upserted", "rows_affected": cur.rowcount, } except Exception as e: return {"status": "error", "error": str(e)}
- src/cockroachdb_mcp/server.py:326-350 (registration)MCP tool registration using @mcp.tool() decorator. This wrapper function delegates to the core crud.upsert_row implementation and adds error handling.@mcp.tool() async def upsert_row( table: str, data: dict[str, Any], conflict_columns: list[str], update_columns: list[str] | None = None, returning: list[str] | None = None, ) -> dict[str, Any]: """Insert or update a row (UPSERT). Args: table: Table name (schema.table or just table). data: Column names and values. conflict_columns: Columns to check for conflicts (usually primary key). update_columns: Columns to update on conflict (default: all except conflict columns). returning: Columns to return. Returns: Upsert result. """ try: return await crud.upsert_row(table, data, conflict_columns, update_columns, returning) except Exception as e: return {"status": "error", "error": str(e)}