sql_execute
Execute SQL statements to insert, update, or delete spreadsheet data directly within files. Automatically writes changes back to maintain data integrity.
Instructions
Execute a mutating SQL statement and write changes back to the file.
Supports INSERT INTO (adds rows), UPDATE (modifies cell values), and DELETE FROM (removes rows). The target sheet is determined from the SQL statement. After execution, the modified table is written back to the file atomically. Returns {"affected_rows": N}.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| file | Yes | Path to the spreadsheet file | |
| sql | Yes | SQL mutation statement to execute: INSERT INTO, UPDATE, or DELETE FROM. Sheet names are table names. Example: UPDATE Sales SET status = 'Closed' WHERE quarter = 'Q1' AND revenue < 1000 | |
| header_row | No | 1-based row number containing column headers. Defaults to 1. |
Implementation Reference
- The `sql_execute` function is decorated with `@mcp.tool()` and implements the logic for executing SQL mutations (INSERT, UPDATE, DELETE) on spreadsheet files by updating the underlying sheet and saving the file.
@mcp.tool() def sql_execute( file: Annotated[str, Field(description="Path to the spreadsheet file")], sql: Annotated[str, Field(description=( "SQL mutation statement to execute: INSERT INTO, UPDATE, or " "DELETE FROM. Sheet names are table names. " "Example: UPDATE Sales SET status = 'Closed' " "WHERE quarter = 'Q1' AND revenue < 1000" ))], header_row: Annotated[int, Field(description="1-based row number containing column headers. Defaults to 1.")] = 1, ) -> dict: """Execute a mutating SQL statement and write changes back to the file. Supports INSERT INTO (adds rows), UPDATE (modifies cell values), and DELETE FROM (removes rows). The target sheet is determined from the SQL statement. After execution, the modified table is written back to the file atomically. Returns {"affected_rows": N}. """ sql_stripped = sql.strip().rstrip(";") target_table = _extract_target_table(sql_stripped) wb = load_workbook(file) ws = _resolve_sheet(wb, target_table) headers, _ = _sheet_to_records(ws, header_row) if not headers: raise ValueError(f"Sheet {target_table!r} has no headers at row {header_row}") headers = _dedup_headers(headers) num_cols = len(headers) conn = _load_sheets_to_duckdb(wb, header_row) result = conn.execute(sql_stripped) affected = result.fetchone()[0] col_list = ", ".join(f'"{h}"' for h in headers) new_rows = conn.execute(f'SELECT {col_list} FROM "{target_table}"').fetchall() old_max_row = ws.max_row or header_row for r in range(header_row + 1, old_max_row + 1): for c in range(1, num_cols + 1): ws.set_cell(r, c, None) for r_idx, row in enumerate(new_rows): for c_idx, val in enumerate(row): ws.set_cell(header_row + 1 + r_idx, c_idx + 1, val) wb.save(file) return {"affected_rows": affected}