Skip to main content
Glama

MCP Video Parser

clean_videos.py13.4 kB
#!/usr/bin/env python3 """ Clean all videos from the database and file system. This script provides a complete reset of the video storage system by: 1. Removing all video entries from the database 2. Deleting all video files from the location-based structure 3. Removing all processed files (frames, transcripts) 4. Optionally backing up the database before cleaning """ import argparse import shutil import sqlite3 import sys from datetime import datetime from pathlib import Path from typing import Optional # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) from src.storage.manager import StorageManager from src.utils.logging import get_logger class VideoCleaner: """Handles complete cleanup of video storage system.""" def __init__(self, backup_db: bool = True): """ Initialize the video cleaner. Args: backup_db: Whether to backup database before cleaning """ self.storage = StorageManager() self.logger = get_logger(__name__) self.backup_db = backup_db self.stats = { 'videos_removed': 0, 'files_deleted': 0, 'space_freed': 0 } def backup_database(self) -> Optional[Path]: """Create a backup of the database before cleaning.""" if not self.backup_db: return None db_path = self.storage.base_path / "index" / "metadata.db" if not db_path.exists(): self.logger.warning("No database found to backup") return None # Create backup filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = db_path.parent / f"metadata_backup_{timestamp}.db" try: shutil.copy2(db_path, backup_path) self.logger.info(f"Database backed up to: {backup_path}") return backup_path except Exception as e: self.logger.error(f"Failed to backup database: {e}") return None def get_all_videos(self) -> list: """Get all video records from the database.""" conn = sqlite3.connect(self.storage.base_path / "index" / "metadata.db") conn.row_factory = sqlite3.Row cursor = conn.cursor() try: cursor.execute("SELECT * FROM videos") videos = cursor.fetchall() return [dict(row) for row in videos] finally: conn.close() def clean_database(self) -> int: """Remove all video entries from the database.""" conn = sqlite3.connect(self.storage.base_path / "index" / "metadata.db") cursor = conn.cursor() try: # Get count before deletion cursor.execute("SELECT COUNT(*) FROM videos") count = cursor.fetchone()[0] # Delete all related data self.logger.info("Cleaning database tables...") # Delete from all related tables tables = ['frame_analyses', 'transcripts', 'summaries', 'videos'] for table in tables: try: cursor.execute(f"DELETE FROM {table}") self.logger.debug(f"Cleared table: {table}") except sqlite3.OperationalError: self.logger.warning(f"Table {table} does not exist") conn.commit() self.logger.info(f"Removed {count} video entries from database") return count except Exception as e: conn.rollback() self.logger.error(f"Database cleanup failed: {e}") raise finally: conn.close() def clean_file_system(self) -> tuple[int, int]: """ Remove all video files from the file system. Returns: Tuple of (files_deleted, bytes_freed) """ files_deleted = 0 bytes_freed = 0 # Clean location-based structure locations_dir = self.storage.base_path / "locations" if locations_dir.exists(): self.logger.info(f"Cleaning location directories: {locations_dir}") for location_dir in locations_dir.iterdir(): if location_dir.is_dir(): # Calculate size before deletion size = sum(f.stat().st_size for f in location_dir.rglob('*') if f.is_file()) file_count = sum(1 for f in location_dir.rglob('*') if f.is_file()) self.logger.info(f"Removing location '{location_dir.name}': {file_count} files, {size / 1024 / 1024:.2f} MB") try: shutil.rmtree(location_dir) files_deleted += file_count bytes_freed += size except Exception as e: self.logger.error(f"Failed to remove {location_dir}: {e}") # Clean processed files processed_dir = self.storage.base_path / "processed" if processed_dir.exists(): self.logger.info(f"Cleaning processed files: {processed_dir}") for video_dir in processed_dir.iterdir(): if video_dir.is_dir(): # Calculate size size = sum(f.stat().st_size for f in video_dir.rglob('*') if f.is_file()) file_count = sum(1 for f in video_dir.rglob('*') if f.is_file()) try: shutil.rmtree(video_dir) files_deleted += file_count bytes_freed += size except Exception as e: self.logger.error(f"Failed to remove {video_dir}: {e}") # Clean temp directory temp_dir = self.storage.base_path / "temp" if temp_dir.exists(): self.logger.info("Cleaning temp directory") try: shutil.rmtree(temp_dir) temp_dir.mkdir() # Recreate empty temp dir except Exception as e: self.logger.error(f"Failed to clean temp directory: {e}") return files_deleted, bytes_freed def clean_originals(self) -> tuple[int, int]: """ Optionally clean original video files. Returns: Tuple of (files_deleted, bytes_freed) """ originals_dir = self.storage.base_path / "originals" if not originals_dir.exists(): return 0, 0 files_deleted = 0 bytes_freed = 0 self.logger.info(f"Cleaning original files: {originals_dir}") for file in originals_dir.iterdir(): if file.is_file() and file.suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv', '.webm']: size = file.stat().st_size try: file.unlink() files_deleted += 1 bytes_freed += size self.logger.debug(f"Deleted: {file.name}") except Exception as e: self.logger.error(f"Failed to delete {file}: {e}") return files_deleted, bytes_freed def run(self, clean_originals: bool = False) -> dict: """ Run the complete cleanup process. Args: clean_originals: Whether to also delete original video files Returns: Dictionary with cleanup statistics """ self.logger.info("Starting video cleanup process...") # Get video count before cleanup videos = self.get_all_videos() self.stats['videos_removed'] = len(videos) # Backup database if requested backup_path = self.backup_database() if backup_path: self.stats['backup_path'] = str(backup_path) # Clean database try: self.clean_database() except Exception as e: self.logger.error(f"Database cleanup failed: {e}") if backup_path: self.logger.info(f"Database backup available at: {backup_path}") raise # Clean file system files_deleted, bytes_freed = self.clean_file_system() self.stats['files_deleted'] = files_deleted self.stats['space_freed'] = bytes_freed # Optionally clean originals if clean_originals: orig_files, orig_bytes = self.clean_originals() self.stats['files_deleted'] += orig_files self.stats['space_freed'] += orig_bytes self.stats['originals_deleted'] = orig_files self.logger.info("Video cleanup completed successfully!") return self.stats def main(): """Main entry point for the cleanup script.""" parser = argparse.ArgumentParser( description="Clean all videos from the database and file system", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Clean all processed files and database (keeps originals) python clean_videos.py # Clean everything including original video files python clean_videos.py --clean-originals # Clean without creating database backup python clean_videos.py --no-backup # Dry run to see what would be deleted python clean_videos.py --dry-run """ ) parser.add_argument( "--clean-originals", action="store_true", help="Also delete original video files (default: keep originals)" ) parser.add_argument( "--no-backup", action="store_true", help="Skip database backup before cleaning" ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be deleted without actually deleting" ) parser.add_argument( "-y", "--yes", action="store_true", help="Skip confirmation prompt" ) args = parser.parse_args() # Initialize cleaner cleaner = VideoCleaner(backup_db=not args.no_backup) # Get current state videos = cleaner.get_all_videos() if not videos: print("No videos found in the system. Nothing to clean.") return # Show what will be cleaned print(f"\nFound {len(videos)} videos in the database") if args.dry_run: print("\n=== DRY RUN MODE ===") print("The following would be deleted:") # Show locations locations_dir = cleaner.storage.base_path / "locations" if locations_dir.exists(): print(f"\nLocation directories:") for loc in locations_dir.iterdir(): if loc.is_dir(): file_count = sum(1 for f in loc.rglob('*') if f.is_file()) size = sum(f.stat().st_size for f in loc.rglob('*') if f.is_file()) print(f" - {loc.name}: {file_count} files, {size / 1024 / 1024:.2f} MB") # Show processed processed_dir = cleaner.storage.base_path / "processed" if processed_dir.exists(): video_count = sum(1 for d in processed_dir.iterdir() if d.is_dir()) print(f"\nProcessed video directories: {video_count}") if args.clean_originals: originals_dir = cleaner.storage.base_path / "originals" if originals_dir.exists(): orig_videos = [f for f in originals_dir.iterdir() if f.is_file() and f.suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv', '.webm']] print(f"\nOriginal videos: {len(orig_videos)} files") print("\nNo files were deleted (dry run mode)") return # Confirm with user if not args.yes: print("\nThis will:") print("- Remove all video entries from the database") print("- Delete all processed frames and transcripts") print("- Delete all videos from the location-based structure") if args.clean_originals: print("- Delete all original video files") if not args.no_backup: print("- Create a backup of the database first") response = input("\nAre you sure you want to continue? (yes/no): ") if response.lower() not in ['yes', 'y']: print("Cleanup cancelled.") return # Run cleanup try: stats = cleaner.run(clean_originals=args.clean_originals) # Display results print("\n=== Cleanup Complete ===") print(f"Videos removed from database: {stats['videos_removed']}") print(f"Files deleted: {stats['files_deleted']}") print(f"Space freed: {stats['space_freed'] / 1024 / 1024:.2f} MB") if 'originals_deleted' in stats: print(f"Original videos deleted: {stats['originals_deleted']}") if 'backup_path' in stats: print(f"\nDatabase backup saved to: {stats['backup_path']}") except Exception as e: print(f"\nError during cleanup: {e}") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/michaelbaker-dev/mcpVideoParser'

If you have feedback or need assistance with the MCP directory API, please join our Discord server