Skip to main content
Glama
migrate_todo_schema.py13.7 kB
#!/usr/bin/env python3 """ Migration script to standardize existing todo field names and structure. Performs: 1. Field standardization: target → target_agent 2. Move completed_by from metadata to top-level 3. Move completion_comment from metadata to top-level 4. Normalize timestamp formats 5. Validate and clean metadata structures Usage: python migration_scripts/migrate_todo_schema.py [--dry-run] [--batch-size=1000] """ import asyncio import argparse import json import logging import os import sys from datetime import datetime, timezone from typing import Dict, Any, List, Optional, Tuple # Add src to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) from Omnispindle.database import db_connection from Omnispindle.context import Context from Omnispindle.schemas.todo_metadata_schema import validate_todo_metadata, TodoMetadata from pymongo import MongoClient # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class TodoSchemaMigrator: """Handles migration of todos to standardized schema.""" def __init__(self, dry_run: bool = False, batch_size: int = 1000): self.dry_run = dry_run self.batch_size = batch_size self.stats = { 'total_todos': 0, 'migrated': 0, 'already_compliant': 0, 'validation_warnings': 0, 'errors': 0, 'field_migrations': { 'target_to_target_agent': 0, 'completed_by_moved': 0, 'completion_comment_moved': 0, 'metadata_cleaned': 0, 'timestamps_normalized': 0 } } def create_backup(self, collections: Dict) -> str: """Create a backup of the todos collection before migration.""" backup_timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") backup_collection_name = f"todos_backup_{backup_timestamp}" if self.dry_run: logger.info(f"[DRY RUN] Would create backup: {backup_collection_name}") return backup_collection_name todos_collection = collections['todos'] backup_collection = collections.database[backup_collection_name] # Copy all documents to backup todos = list(todos_collection.find({})) if todos: backup_collection.insert_many(todos) logger.info(f"✅ Created backup with {len(todos)} todos: {backup_collection_name}") else: logger.info("No todos to backup") return backup_collection_name def analyze_todo_compliance(self, todo: Dict[str, Any]) -> Dict[str, Any]: """Analyze what migrations are needed for a todo.""" migrations_needed = { 'target_to_target_agent': 'target' in todo and 'target_agent' not in todo, 'completed_by_to_toplevel': False, 'completion_comment_to_toplevel': False, 'metadata_cleanup': False, 'timestamp_normalization': False } # Check metadata structure metadata = todo.get('metadata', {}) if isinstance(metadata, dict): # Check for fields that should be moved to top level if 'completed_by' in metadata and 'completed_by' not in todo: migrations_needed['completed_by_to_toplevel'] = True if 'completion_comment' in metadata and 'completion_comment' not in todo: migrations_needed['completion_comment_to_toplevel'] = True # Check if metadata needs schema validation/cleanup if metadata and not metadata.get('_validation_warning'): try: validate_todo_metadata(metadata) except Exception: migrations_needed['metadata_cleanup'] = True # Check timestamp formats (basic heuristic) for field in ['created_at', 'updated_at', 'completed_at']: if field in todo: value = todo[field] # If it's a string, it might need normalization to timestamp if isinstance(value, str) and not str(value).isdigit(): migrations_needed['timestamp_normalization'] = True break return migrations_needed def migrate_todo_fields(self, todo: Dict[str, Any]) -> Tuple[Dict[str, Any], List[str]]: """Apply field migrations to a single todo.""" migrated_todo = todo.copy() changes = [] # 1. Migrate target → target_agent if 'target' in migrated_todo and 'target_agent' not in migrated_todo: migrated_todo['target_agent'] = migrated_todo.pop('target') changes.append('target → target_agent') self.stats['field_migrations']['target_to_target_agent'] += 1 # 2. Move completed_by from metadata to top level metadata = migrated_todo.get('metadata', {}) if isinstance(metadata, dict) and 'completed_by' in metadata and 'completed_by' not in migrated_todo: migrated_todo['completed_by'] = metadata.pop('completed_by') changes.append('completed_by moved to top-level') self.stats['field_migrations']['completed_by_moved'] += 1 # 3. Move completion_comment from metadata to top level if isinstance(metadata, dict) and 'completion_comment' in metadata and 'completion_comment' not in migrated_todo: migrated_todo['completion_comment'] = metadata.pop('completion_comment') changes.append('completion_comment moved to top-level') self.stats['field_migrations']['completion_comment_moved'] += 1 # 4. Clean and validate metadata if metadata: try: # Remove any validation warnings from previous runs if '_validation_warning' in metadata: metadata.pop('_validation_warning') validated_metadata = validate_todo_metadata(metadata) migrated_todo['metadata'] = validated_metadata.model_dump(exclude_none=True) changes.append('metadata validated and cleaned') self.stats['field_migrations']['metadata_cleaned'] += 1 except Exception as e: # Keep original metadata but add validation warning migrated_todo['metadata'] = metadata migrated_todo['metadata']['_validation_warning'] = f"Migration validation failed: {str(e)}" changes.append(f'metadata validation failed: {str(e)}') self.stats['validation_warnings'] += 1 # 5. Normalize timestamps (convert string dates to unix timestamps) for field in ['created_at', 'updated_at', 'completed_at']: if field in migrated_todo: value = migrated_todo[field] if isinstance(value, str) and not str(value).isdigit(): try: # Try to parse ISO format or other common formats dt = datetime.fromisoformat(value.replace('Z', '+00:00')) migrated_todo[field] = int(dt.timestamp()) changes.append(f'{field} normalized to unix timestamp') self.stats['field_migrations']['timestamps_normalized'] += 1 except Exception: logger.warning(f"Could not normalize timestamp {field}: {value}") # Ensure updated_at is set if 'updated_at' not in migrated_todo: migrated_todo['updated_at'] = int(datetime.now(timezone.utc).timestamp()) changes.append('added updated_at timestamp') return migrated_todo, changes async def migrate_batch(self, collections: Dict, todos: List[Dict]) -> None: """Migrate a batch of todos.""" todos_collection = collections['todos'] for todo in todos: try: self.stats['total_todos'] += 1 # Analyze what migrations are needed migrations_needed = self.analyze_todo_compliance(todo) # If no migrations needed, skip if not any(migrations_needed.values()): self.stats['already_compliant'] += 1 continue # Apply migrations migrated_todo, changes = self.migrate_todo_fields(todo) if self.dry_run: logger.info(f"[DRY RUN] Would migrate todo {todo.get('id', 'unknown')}: {', '.join(changes)}") else: # Update in database result = todos_collection.replace_one( {'_id': todo['_id']}, migrated_todo ) if result.modified_count == 1: logger.debug(f"✅ Migrated todo {todo.get('id', 'unknown')}: {', '.join(changes)}") else: logger.error(f"❌ Failed to update todo {todo.get('id', 'unknown')}") self.stats['errors'] += 1 continue self.stats['migrated'] += 1 except Exception as e: logger.error(f"❌ Error migrating todo {todo.get('id', 'unknown')}: {str(e)}") self.stats['errors'] += 1 async def run_migration(self, user_email: Optional[str] = None) -> None: """Run the complete migration process.""" logger.info(f"🚀 Starting todo schema migration {'(DRY RUN)' if self.dry_run else ''}") try: # Set up user context if provided user = {"email": user_email} if user_email else None collections = db_connection.get_collections(user) # Create backup backup_name = self.create_backup(collections) # Get total count todos_collection = collections['todos'] total_count = todos_collection.count_documents({}) logger.info(f"📊 Found {total_count} todos to analyze") if total_count == 0: logger.info("✅ No todos to migrate") return # Process in batches processed = 0 while processed < total_count: batch = list(todos_collection.find({}).skip(processed).limit(self.batch_size)) if not batch: break await self.migrate_batch(collections, batch) processed += len(batch) logger.info(f"📈 Progress: {processed}/{total_count} todos processed") # Print final stats self.print_migration_summary(backup_name) except Exception as e: logger.error(f"❌ Migration failed: {str(e)}") raise def print_migration_summary(self, backup_name: str) -> None: """Print comprehensive migration statistics.""" print("\n" + "="*60) print(f"📋 MIGRATION SUMMARY {'(DRY RUN)' if self.dry_run else ''}") print("="*60) print(f"📊 Processed: {self.stats['total_todos']} todos") print(f"✅ Migrated: {self.stats['migrated']} todos") print(f"✨ Already compliant: {self.stats['already_compliant']} todos") print(f"⚠️ Validation warnings: {self.stats['validation_warnings']} todos") print(f"❌ Errors: {self.stats['errors']} todos") print(f"\n🔧 Field Migrations Applied:") for field, count in self.stats['field_migrations'].items(): if count > 0: print(f" • {field.replace('_', ' ').title()}: {count}") print(f"\n💾 Backup created: {backup_name}") if not self.dry_run and self.stats['migrated'] > 0: print(f"\n🎉 Migration completed successfully!") print(f" • {self.stats['migrated']} todos updated") print(f" • Schema standardization: ✅") print(f" • Backward compatibility: ✅") elif self.dry_run: print(f"\n🔍 Dry run completed - no changes made") print(f" • Run without --dry-run to apply migrations") print("="*60) async def main(): """Main migration entry point.""" parser = argparse.ArgumentParser( description="Migrate todos to standardized schema format" ) parser.add_argument( '--dry-run', action='store_true', help='Show what would be migrated without making changes' ) parser.add_argument( '--batch-size', type=int, default=1000, help='Number of todos to process per batch (default: 1000)' ) parser.add_argument( '--user-email', type=str, help='User email for user-scoped collections (optional)' ) args = parser.parse_args() # Initialize migrator migrator = TodoSchemaMigrator( dry_run=args.dry_run, batch_size=args.batch_size ) # Run migration await migrator.run_migration(args.user_email) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MadnessEngineering/fastmcp-todo-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server