Skip to main content
Glama
history_demo.py11.7 kB
#!/usr/bin/env python3 """Demonstration of history tracking with undo/redo functionality.""" import asyncio import tempfile import os import pandas as pd from pathlib import Path import json # Setup path for imports import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.csv_editor.models.csv_session import CSVSession from src.csv_editor.models.history_manager import HistoryStorage from src.csv_editor.models.data_models import OperationType async def demonstrate_history(): """Demonstrate history tracking with undo/redo capabilities.""" print("=" * 60) print("CSV Editor History & Undo/Redo Demonstration") print("=" * 60) # Create initial data initial_data = pd.DataFrame({ 'product': ['Laptop', 'Mouse', 'Keyboard', 'Monitor'], 'price': [999.99, 29.99, 79.99, 299.99], 'stock': [50, 200, 150, 75], 'category': ['Electronics', 'Accessories', 'Accessories', 'Electronics'] }) # Create session with history enabled (JSON persistence) session = CSVSession( enable_history=True, history_storage=HistoryStorage.JSON ) # Load data temp_file = os.path.join(tempfile.gettempdir(), "demo_data.csv") initial_data.to_csv(temp_file, index=False) session.load_data(initial_data, file_path=temp_file) print(f"\n✓ Created session with history tracking") print(f"Initial data:\n{session.df}\n") # Perform a series of operations print("-" * 40) print("Performing operations...") print("-" * 40) # Operation 1: Increase prices print("\n1. Increasing all prices by 10%") session.df['price'] = session.df['price'] * 1.1 session.record_operation(OperationType.TRANSFORM, {"operation": "price_increase_10%"}) print(f"Prices after increase:\n{session.df['price'].tolist()}") # Operation 2: Filter low stock print("\n2. Filtering items with stock >= 100") session.df = session.df[session.df['stock'] >= 100] session.record_operation(OperationType.FILTER, {"condition": "stock >= 100"}) print(f"Products after filter: {session.df['product'].tolist()}") # Operation 3: Add discount column print("\n3. Adding discount column") session.df['discount'] = 0.15 session.record_operation(OperationType.ADD_COLUMN, {"column": "discount", "value": 0.15}) print(f"Columns: {session.df.columns.tolist()}") # Operation 4: Sort by price print("\n4. Sorting by price (descending)") session.df = session.df.sort_values('price', ascending=False) session.record_operation(OperationType.SORT, {"column": "price", "ascending": False}) print(f"Products after sort: {session.df['product'].tolist()}") # Show current history print("\n" + "=" * 60) print("Current History") print("=" * 60) history_result = session.get_history() if history_result["success"]: print(f"Total operations: {history_result['statistics']['total_operations']}") print(f"Current position: {history_result['statistics']['current_position']}") print(f"Can undo: {history_result['statistics']['can_undo']}") print(f"Can redo: {history_result['statistics']['can_redo']}") print("\nOperations:") for op in history_result['history']: print(f" [{op['index']}] {op['operation_type']} - {op['timestamp']}") if op['is_current']: print(f" ^ Current position") # Demonstrate undo print("\n" + "=" * 60) print("Undo Operations") print("=" * 60) print("\n1. Undoing last operation (sort)...") undo_result = await session.undo() if undo_result["success"]: print(f"✓ {undo_result['message']}") print(f"Products order: {session.df['product'].tolist() if session.df is not None else 'N/A'}") print("\n2. Undoing again (add discount column)...") undo_result = await session.undo() if undo_result["success"]: print(f"✓ {undo_result['message']}") print(f"Columns: {session.df.columns.tolist() if session.df is not None else 'N/A'}") print("\n3. Undoing once more (filter)...") undo_result = await session.undo() if undo_result["success"]: print(f"✓ {undo_result['message']}") print(f"Row count: {len(session.df) if session.df is not None else 0}") print(f"Products: {session.df['product'].tolist() if session.df is not None else 'N/A'}") # Show history after undo history_result = session.get_history() print(f"\nCurrent position after undos: {history_result['statistics']['current_position']}") print(f"Can redo: {history_result['statistics']['can_redo']}") # Demonstrate redo print("\n" + "=" * 60) print("Redo Operations") print("=" * 60) print("\n1. Redoing (re-apply filter)...") redo_result = await session.redo() if redo_result["success"]: print(f"✓ {redo_result['message']}") print(f"Row count: {len(session.df) if session.df is not None else 0}") print("\n2. Redoing again (re-add discount column)...") redo_result = await session.redo() if redo_result["success"]: print(f"✓ {redo_result['message']}") print(f"Columns: {session.df.columns.tolist() if session.df is not None else 'N/A'}") # Perform a new operation (clears redo stack) print("\n" + "=" * 60) print("New Operation (clears redo stack)") print("=" * 60) print("\nAdding a new column 'in_stock'...") session.df['in_stock'] = True session.record_operation(OperationType.ADD_COLUMN, {"column": "in_stock", "value": True}) print(f"Columns: {session.df.columns.tolist()}") history_result = session.get_history() print(f"\nCan still redo? {history_result['statistics']['can_redo']} (should be False)") # Demonstrate restore to specific operation print("\n" + "=" * 60) print("Restore to Specific Operation") print("=" * 60) # Get first operation ID history_result = session.get_history() if history_result["success"] and history_result["history"]: first_op = history_result["history"][0] print(f"\nRestoring to first operation: {first_op['operation_type']}") restore_result = await session.restore_to_operation(first_op['operation_id']) if restore_result["success"]: print(f"✓ {restore_result['message']}") print(f"Data shape: {restore_result['shape']}") print(f"Current data:\n{session.df}") # Export history print("\n" + "=" * 60) print("Export History") print("=" * 60) # Export as JSON history_file = os.path.join(tempfile.gettempdir(), "history_export.json") if session.history_manager: success = session.history_manager.export_history(history_file, "json") if success: print(f"✓ History exported to: {history_file}") # Show exported content with open(history_file, 'r') as f: history_data = json.load(f) print(f" Total operations: {history_data['total_operations']}") print(f" Exported at: {history_data['exported_at']}") # Show history statistics print("\n" + "=" * 60) print("History Statistics") print("=" * 60) if session.history_manager: stats = session.history_manager.get_statistics() print(f"Total operations: {stats['total_operations']}") print(f"Current position: {stats['current_position']}") print(f"Snapshots saved: {stats['snapshots_count']}") print(f"Storage type: {stats['storage_type']}") print("\nOperation breakdown:") for op_type, count in stats['operation_types'].items(): print(f" {op_type}: {count}") # Show persistence print("\n" + "=" * 60) print("History Persistence") print("=" * 60) history_dir = session.history_manager.history_dir if session.history_manager else None if history_dir: print(f"History directory: {history_dir}") # List history files history_files = list(Path(history_dir).glob(f"*{session.session_id}*")) print(f"History files created: {len(history_files)}") for hf in history_files: print(f" - {hf.name}") # Check snapshot directory snapshot_dir = Path(history_dir) / "snapshots" / session.session_id if snapshot_dir.exists(): snapshots = list(snapshot_dir.glob("*.pkl")) print(f"Snapshots saved: {len(snapshots)}") print("\n✅ History demonstration completed!") return session async def demonstrate_history_recovery(): """Demonstrate recovering history from a previous session.""" print("\n" + "=" * 60) print("History Recovery from Previous Session") print("=" * 60) # Create a session and perform operations print("\n1. Creating initial session with operations...") session1 = CSVSession( session_id="demo-recovery-session", enable_history=True, history_storage=HistoryStorage.JSON ) data = pd.DataFrame({ 'item': ['A', 'B', 'C'], 'value': [10, 20, 30] }) session1.load_data(data) # Perform operations session1.df['value'] = session1.df['value'] * 2 session1.record_operation(OperationType.TRANSFORM, {"operation": "double_values"}) session1.df['status'] = 'active' session1.record_operation(OperationType.ADD_COLUMN, {"column": "status"}) print(f" Operations performed: 2") print(f" Final data:\n{session1.df}") # Simulate session end session1_id = session1.session_id del session1 print("\n2. Session ended. Creating new session with same ID...") # Create new session with same ID - should load history session2 = CSVSession( session_id=session1_id, enable_history=True, history_storage=HistoryStorage.JSON ) # Load the data (in real scenario, would load from file) session2.load_data(data) # Check if history was recovered history_result = session2.get_history() if history_result["success"]: print(f"✓ History recovered! Found {history_result['statistics']['total_operations']} operations") # Can we undo operations from previous session? print("\n3. Testing undo on recovered history...") if history_result['statistics']['can_undo']: undo_result = await session2.undo() if undo_result["success"]: print(f"✓ Successfully undid operation from previous session!") print(f" Operation: {undo_result['operation']['operation_type']}") print("\n✅ History recovery demonstration completed!") async def main(): """Run all history demonstrations.""" # Demo 1: Basic history with undo/redo session = await demonstrate_history() # Demo 2: History recovery await demonstrate_history_recovery() print("\n" + "=" * 60) print("Key Features Demonstrated") print("=" * 60) print("✓ Operation history tracking with timestamps") print("✓ Persistent history storage (JSON/Pickle)") print("✓ Undo/Redo functionality") print("✓ Restore to any previous operation") print("✓ History export (JSON/CSV)") print("✓ History recovery across sessions") print("✓ Automatic snapshots for data recovery") print("✓ History statistics and analysis") if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server