undo
Revert the most recent CSV editing operation in your current session.
Instructions
Undo the last operation in a session.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| session_id | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/csv_editor/server.py:481-484 (registration)MCP tool registration: The 'undo' tool is registered as a FastMCP tool that delegates to _undo_operation.
@mcp.tool async def undo(session_id: str, ctx: Context = None) -> dict[str, Any]: """Undo the last operation in a session.""" return await _undo_operation(session_id, ctx) - Tool handler: undo_operation() retrieves the session, calls session.undo(), and returns OperationResult.
async def undo_operation(session_id: str, ctx: Context = None) -> dict[str, Any]: """ Undo the last operation in a session. Args: session_id: Session identifier ctx: FastMCP context Returns: Dict with success status and undo result """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session: return OperationResult( success=False, message="Session not found", error=f"No session with ID: {session_id}", ).model_dump() if ctx: await ctx.info(f"Undoing last operation for session {session_id}") result = await session.undo() if result["success"]: if ctx: await ctx.info(f"Successfully undid operation: {result.get('message')}") return OperationResult( success=True, message=result["message"], session_id=session_id, data=result ).model_dump() else: return OperationResult( success=False, message="Failed to undo operation", error=result.get("error") ).model_dump() except Exception as e: logger.error(f"Error undoing operation: {e!s}") if ctx: await ctx.error(f"Failed to undo operation: {e!s}") return OperationResult( success=False, message="Failed to undo operation", error=str(e) ).model_dump() - Session-level undo: CsvSession.undo() checks history_manager, calls history_manager.undo(), restores the dataframe snapshot, and triggers auto-save.
async def undo(self) -> dict[str, Any]: """Undo the last operation.""" if not self.history_manager: return {"success": False, "error": "History is not enabled"} if not self.history_manager.can_undo(): return {"success": False, "error": "No operations to undo"} try: operation, data_snapshot = self.history_manager.undo() if data_snapshot is not None: self.df = data_snapshot # Trigger auto-save if configured if self.auto_save_manager.should_save_after_operation(): await self.auto_save_manager.trigger_save(self._save_callback, "undo") return { "success": True, "message": f"Undid operation: {operation.operation_type}", "operation": operation.to_dict(), "can_undo": self.history_manager.can_undo(), "can_redo": self.history_manager.can_redo(), } else: return {"success": False, "error": "No snapshot available for undo"} except Exception as e: logger.error(f"Error during undo: {e!s}") return {"success": False, "error": str(e)} - Core undo logic: HistoryManager.undo() moves current operation to the redo stack, decrements the index, and returns the previous data snapshot.
def undo(self) -> tuple[OperationHistory | None, pd.DataFrame | None]: """Undo the last operation and return the previous state.""" if not self.can_undo(): return None, None # Move current operation to redo stack current_op = self.history[self.current_index] self.redo_stack.append(current_op) # Move index back self.current_index -= 1 # Find the most recent snapshot before current position snapshot = None for i in range(self.current_index, -1, -1): if self.history[i].data_snapshot is not None: snapshot = self.history[i].data_snapshot.copy() break # Save state if self.storage_type != HistoryStorage.MEMORY: self._save_history() logger.info(f"Undid operation: {current_op.operation_type}") # Return the operation that was undone and the data to restore return current_op, snapshot - OperationResult schema used by the undo handler to structure its return value.
class OperationResult(BaseModel): """Result of a data operation.""" success: bool = Field(..., description="Whether operation succeeded") message: str = Field(..., description="Result message") session_id: str | None = Field(None, description="Session ID") rows_affected: int | None = Field(None, description="Number of rows affected") columns_affected: list[str] | None = Field(None, description="Columns affected") data: dict[str, Any] | None = Field(None, description="Additional result data") error: str | None = Field(None, description="Error message if failed") warnings: list[str] | None = Field(None, description="Warning messages")