execute
Execute write SQL statements (INSERT, UPDATE, DELETE) to modify PostgreSQL database data. Use with caution as this tool directly changes database content.
Instructions
Execute a write SQL statement (INSERT, UPDATE, DELETE).
WARNING: This tool modifies data. Use with caution.
Only available if ALLOW_WRITE_OPERATIONS=true is set.
Args:
sql: SQL statement to execute
Returns:
Execution result with affected row count
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes |
Implementation Reference
- postgres_mcp/server.py:74-104 (handler)The primary handler for the 'execute' tool. Decorated with @mcp.tool() for registration in FastMCP. Performs permission check and calls the underlying PostgresClient.execute_query method.@mcp.tool() @handle_db_error def execute(sql: str) -> dict: """Execute a write SQL statement (INSERT, UPDATE, DELETE). WARNING: This tool modifies data. Use with caution. Only available if ALLOW_WRITE_OPERATIONS=true is set. Args: sql: SQL statement to execute Returns: Execution result with affected row count """ settings = get_settings() if not settings.allow_write_operations: return { "success": False, "error": "Write operations are disabled. Set ALLOW_WRITE_OPERATIONS=true to enable.", } client = get_client() result = client.execute_query(sql, allow_write=True) return { "success": True, "row_count": result["row_count"], "message": result.get("message", "Query executed successfully"), }
- Supporting helper method PostgresClient.execute_query that implements the actual SQL execution logic, including validation, connection management, result formatting, and handling both SELECT and write operations.def execute_query( self, query: str, params: Optional[tuple] = None, allow_write: bool = False, max_rows: Optional[int] = None, ) -> dict[str, Any]: """Execute a SQL query. Args: query: SQL query string params: Optional query parameters allow_write: Whether to allow write operations max_rows: Maximum rows to return (None uses settings default) Returns: Dict with results, row_count, columns """ # Validate query validated_query = validate_query(query, allow_write=allow_write) max_rows = max_rows or self.settings.max_rows with self.get_connection() as conn: cursor = conn.cursor() try: cursor.execute(validated_query, params) # Check if it's a SELECT query is_select = validated_query.strip().upper().startswith("SELECT") if is_select: rows = cursor.fetchmany(max_rows + 1) truncated = len(rows) > max_rows if truncated: rows = rows[:max_rows] columns = [desc[0] for desc in cursor.description] if cursor.description else [] return { "success": True, "rows": [dict(row) for row in rows], "row_count": len(rows), "columns": columns, "truncated": truncated, } else: conn.commit() return { "success": True, "rows": [], "row_count": cursor.rowcount, "columns": [], "message": f"{cursor.rowcount} rows affected", } except psycopg2.Error as e: conn.rollback() raise PostgresClientError(f"Query failed: {e}") from e finally: cursor.close()