Skip to main content
Glama
bpamiri

CockroachDB MCP Server

by bpamiri

upsert_row

Insert new rows or update existing ones in CockroachDB tables using UPSERT operations, handling conflicts with specified columns and returning results as needed.

Instructions

Insert or update a row (UPSERT).

Args:
    table: Table name (schema.table or just table).
    data: Column names and values.
    conflict_columns: Columns to check for conflicts (usually primary key).
    update_columns: Columns to update on conflict (default: all except conflict columns).
    returning: Columns to return.

Returns:
    Upsert result.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
tableYes
dataYes
conflict_columnsYes
update_columnsNo
returningNo

Implementation Reference

  • MCP tool handler and registration for upsert_row. Defines input schema via annotations and docstring. Delegates to helper in crud module.
    @mcp.tool()
    async def upsert_row(
        table: str,
        data: dict[str, Any],
        conflict_columns: list[str],
        update_columns: list[str] | None = None,
        returning: list[str] | None = None,
    ) -> dict[str, Any]:
        """Insert or update a row (UPSERT).
    
        Args:
            table: Table name (schema.table or just table).
            data: Column names and values.
            conflict_columns: Columns to check for conflicts (usually primary key).
            update_columns: Columns to update on conflict (default: all except conflict columns).
            returning: Columns to return.
    
        Returns:
            Upsert result.
        """
        try:
            return await crud.upsert_row(table, data, conflict_columns, update_columns, returning)
        except Exception as e:
            return {"status": "error", "error": str(e)}
  • Supporting utility function containing the core logic for executing the UPSERT operation on CockroachDB, including validation, query building, and execution.
    async def upsert_row(
        table: str,
        data: dict[str, Any],
        conflict_columns: list[str],
        update_columns: list[str] | None = None,
        returning: list[str] | None = None,
    ) -> dict[str, Any]:
        """Insert or update a row (UPSERT).
    
        Args:
            table: Table name (schema.table or just table).
            data: Column names and values.
            conflict_columns: Columns to check for conflicts (usually primary key).
            update_columns: Columns to update on conflict (default: all except conflict columns).
            returning: Columns to return.
    
        Returns:
            Upsert result.
        """
        # Check read-only mode
        if settings.read_only:
            return {"status": "error", "error": "Server is in read-only mode"}
    
        # Validate table name
        valid, error = _validate_table_name(table)
        if not valid:
            return {"status": "error", "error": error}
    
        if not data:
            return {"status": "error", "error": "No data provided"}
    
        if not conflict_columns:
            return {"status": "error", "error": "No conflict columns specified"}
    
        schema, table_name = _parse_table_name(table)
    
        # Validate all column names
        all_columns = list(data.keys()) + conflict_columns + (update_columns or []) + (returning or [])
        for col in all_columns:
            if not re.match(r"^[\w]+$", col):
                return {"status": "error", "error": f"Invalid column name: {col}"}
    
        # Determine columns to update
        if update_columns is None:
            update_columns = [c for c in data.keys() if c not in conflict_columns]
    
        if not update_columns:
            return {"status": "error", "error": "No columns to update on conflict"}
    
        # Build UPSERT query
        columns = list(data.keys())
        placeholders = ", ".join(["%s"] * len(columns))
        col_list = ", ".join(columns)
        values = list(data.values())
    
        conflict_list = ", ".join(conflict_columns)
        update_set = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
    
        query = f"""
            INSERT INTO {schema}.{table_name} ({col_list})
            VALUES ({placeholders})
            ON CONFLICT ({conflict_list}) DO UPDATE SET {update_set}
        """
    
        # Add RETURNING clause
        if returning:
            query += f" RETURNING {', '.join(returning)}"
    
        conn = await connection_manager.ensure_connected()
    
        try:
            async with conn.cursor() as cur:
                await cur.execute(query, tuple(values))
    
                if returning:
                    row = await cur.fetchone()
                    return {
                        "status": "success",
                        "table": f"{schema}.{table_name}",
                        "action": "upserted",
                        "returning": row,
                    }
                else:
                    return {
                        "status": "success",
                        "table": f"{schema}.{table_name}",
                        "action": "upserted",
                        "rows_affected": cur.rowcount,
                    }
        except Exception as e:
            return {"status": "error", "error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bpamiri/cockroachdb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server