Skip to main content
Glama
manager.py26.1 kB
"""Storage manager for video files and metadata.""" import json import shutil import sqlite3 import hashlib from pathlib import Path from datetime import datetime, timedelta from typing import List, Optional, Dict, Any, Tuple from contextlib import contextmanager import aiofiles import asyncio from ..utils.logging import get_logger from ..utils.config import get_config from .schemas import ( VideoMetadata, ProcessingResult, VideoSummary, FrameAnalysis, ProcessingStatus ) class StorageManager: """Manages storage of video files and associated data.""" def __init__(self, base_path: Optional[str] = None): self.config = get_config() self.base_path = Path(base_path or self.config.storage.base_path) self.logger = get_logger(__name__) # Create directory structure self._init_directories() # Initialize database self._init_database() def _init_directories(self): """Initialize directory structure.""" directories = [ self.base_path / "locations", self.base_path / "processed", self.base_path / "index", self.base_path / "temp" ] for directory in directories: directory.mkdir(parents=True, exist_ok=True) self.logger.debug(f"Ensured directory exists: {directory}") def _init_database(self): """Initialize SQLite database for metadata.""" db_path = self.base_path / "index" / "metadata.db" with self._get_db() as conn: cursor = conn.cursor() # Videos table cursor.execute(""" CREATE TABLE IF NOT EXISTS videos ( video_id TEXT PRIMARY KEY, original_path TEXT NOT NULL, filename TEXT NOT NULL, location TEXT NOT NULL, recording_timestamp TIMESTAMP NOT NULL, duration REAL NOT NULL, fps REAL NOT NULL, width INTEGER NOT NULL, height INTEGER NOT NULL, codec TEXT, size_bytes INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, processed_at TIMESTAMP, status TEXT DEFAULT 'pending', error_message TEXT, processing_time REAL DEFAULT 0.0 ) """) # Frame analysis table cursor.execute(""" CREATE TABLE IF NOT EXISTS frame_analysis ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT NOT NULL, frame_number INTEGER NOT NULL, timestamp REAL NOT NULL, frame_path TEXT NOT NULL, description TEXT, objects_detected TEXT, confidence REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (video_id) REFERENCES videos(video_id), UNIQUE(video_id, frame_number) ) """) # Transcripts table cursor.execute(""" CREATE TABLE IF NOT EXISTS transcripts ( video_id TEXT PRIMARY KEY, transcript TEXT NOT NULL, language TEXT DEFAULT 'en', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (video_id) REFERENCES videos(video_id) ) """) # Search index table cursor.execute(""" CREATE TABLE IF NOT EXISTS search_index ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT NOT NULL, content TEXT NOT NULL, content_type TEXT NOT NULL, timestamp_start REAL, timestamp_end REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (video_id) REFERENCES videos(video_id) ) """) # Create indexes cursor.execute("CREATE INDEX IF NOT EXISTS idx_videos_status ON videos(status)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_videos_location ON videos(location)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_videos_timestamp ON videos(recording_timestamp)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_videos_location_timestamp ON videos(location, recording_timestamp)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_frames_video ON frame_analysis(video_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_search_video ON search_index(video_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_search_content ON search_index(content)") conn.commit() self.logger.info("Database initialized successfully") @contextmanager def _get_db(self): """Get database connection context manager.""" db_path = self.base_path / "index" / "metadata.db" conn = sqlite3.connect(str(db_path), timeout=30.0) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def generate_video_id(self, file_path: str) -> str: """Generate unique video ID from file path.""" # Use hash of file path + timestamp for uniqueness content = f"{file_path}_{datetime.now().isoformat()}" hash_obj = hashlib.sha256(content.encode()) return f"vid_{hash_obj.hexdigest()[:12]}" async def store_video(self, video_path: str, location: Optional[str] = None, recording_timestamp: Optional[datetime] = None) -> VideoMetadata: """Store original video file and create metadata entry. Args: video_path: Path to video file location: Location name (e.g., "shed", "driveway"). If not provided, will try to extract from path or filename recording_timestamp: When the video was recorded. If not provided, will use file modification time """ video_path = Path(video_path) if not video_path.exists(): raise FileNotFoundError(f"Video file not found: {video_path}") # Extract or validate location if location is None: location = self._extract_location_from_path(video_path) location = self._normalize_location(location) # Extract or use provided timestamp if recording_timestamp is None: recording_timestamp = self._extract_timestamp_from_file(video_path) # Generate video ID with timestamp for uniqueness video_id = self.generate_video_id(f"{location}_{recording_timestamp.isoformat()}_{video_path.name}") # Create destination path with location/date structure from ..utils.date_parser import DateParser year, month, day = DateParser.get_date_path_components(recording_timestamp) dest_dir = self.base_path / "locations" / location / year / month / day dest_dir.mkdir(parents=True, exist_ok=True) # Create filename with timestamp timestamp_str = recording_timestamp.strftime("%H%M%S") dest_filename = f"{video_id}_{timestamp_str}{video_path.suffix}" dest_path = dest_dir / dest_filename try: # Copy file asynchronously await self._copy_file_async(video_path, dest_path) # Get video metadata (this would normally use ffprobe) metadata = VideoMetadata( video_id=video_id, original_path=str(video_path), filename=video_path.name, location=location, recording_timestamp=recording_timestamp, duration=0.0, # Will be updated by processor fps=0.0, width=0, height=0, codec="unknown", size_bytes=video_path.stat().st_size, created_at=datetime.now() ) # Store in database self._store_video_metadata(metadata) self.logger.info(f"Stored video {video_id} from {video_path} at {location} ({recording_timestamp})") return metadata except Exception as e: self.logger.error(f"Error storing video: {e}") # Clean up on error if dest_path.exists(): dest_path.unlink() raise async def _copy_file_async(self, src: Path, dst: Path): """Copy file asynchronously.""" chunk_size = 1024 * 1024 # 1MB chunks async with aiofiles.open(src, 'rb') as src_file: async with aiofiles.open(dst, 'wb') as dst_file: while chunk := await src_file.read(chunk_size): await dst_file.write(chunk) def _extract_location_from_path(self, video_path: Path) -> str: """Extract location from video path or filename. Tries to extract location from: 1. Parent directory name 2. Filename pattern (e.g., shed_video.mp4) 3. Default to "unknown" """ # Check parent directory parent_name = video_path.parent.name.lower() common_locations = ['shed', 'driveway', 'front_door', 'backyard', 'garage', 'entrance'] for location in common_locations: if location in parent_name: return location # Check filename filename_lower = video_path.stem.lower() for location in common_locations: if location in filename_lower: return location # Default return "unknown" def _normalize_location(self, location: str) -> str: """Normalize location name for consistency.""" # Remove special characters and spaces normalized = location.lower().strip() normalized = normalized.replace(' ', '_') normalized = ''.join(c for c in normalized if c.isalnum() or c == '_') # Common aliases aliases = { 'front': 'front_door', 'back': 'backyard', 'drive': 'driveway', 'car': 'driveway' } for alias, standard in aliases.items(): if alias in normalized: return standard return normalized or "unknown" def _extract_timestamp_from_file(self, video_path: Path) -> datetime: """Extract recording timestamp from file. Tries: 1. Parse from filename (e.g., 2024-12-06_143022.mp4) 2. Use file modification time """ import re # Try to extract from filename filename = video_path.stem # Common timestamp patterns patterns = [ r'(\d{4})-(\d{2})-(\d{2})_(\d{2})(\d{2})(\d{2})', # YYYY-MM-DD_HHMMSS r'(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})', # YYYYMMDD_HHMMSS r'(\d{4})-(\d{2})-(\d{2}) (\d{2})-(\d{2})-(\d{2})', # YYYY-MM-DD HH-MM-SS ] for pattern in patterns: match = re.search(pattern, filename) if match: try: groups = match.groups() if len(groups) == 6: year, month, day, hour, minute, second = map(int, groups) return datetime(year, month, day, hour, minute, second) except (ValueError, TypeError): continue # Fall back to file modification time stat = video_path.stat() return datetime.fromtimestamp(stat.st_mtime) def _store_video_metadata(self, metadata: VideoMetadata): """Store video metadata in database.""" with self._get_db() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO videos ( video_id, original_path, filename, location, recording_timestamp, duration, fps, width, height, codec, size_bytes, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( metadata.video_id, metadata.original_path, metadata.filename, metadata.location, metadata.recording_timestamp.isoformat(), metadata.duration, metadata.fps, metadata.width, metadata.height, metadata.codec, metadata.size_bytes, metadata.created_at.isoformat() )) conn.commit() def get_video_metadata(self, video_id: str) -> Optional[VideoMetadata]: """Get video metadata by ID.""" with self._get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM videos WHERE video_id = ?", (video_id,)) row = cursor.fetchone() if row: return VideoMetadata( video_id=row['video_id'], original_path=row['original_path'], filename=row['filename'], location=row['location'], recording_timestamp=datetime.fromisoformat(row['recording_timestamp']), duration=row['duration'], fps=row['fps'], width=row['width'], height=row['height'], codec=row['codec'], size_bytes=row['size_bytes'], created_at=datetime.fromisoformat(row['created_at']), processed_at=datetime.fromisoformat(row['processed_at']) if row['processed_at'] else None ) return None def update_video_status(self, video_id: str, status: ProcessingStatus, error_message: Optional[str] = None, processing_time: Optional[float] = None): """Update video processing status.""" with self._get_db() as conn: cursor = conn.cursor() if status == ProcessingStatus.COMPLETED: if processing_time is not None: cursor.execute(""" UPDATE videos SET status = ?, processed_at = ?, error_message = NULL, processing_time = ? WHERE video_id = ? """, (status.value, datetime.now().isoformat(), processing_time, video_id)) else: cursor.execute(""" UPDATE videos SET status = ?, processed_at = ?, error_message = NULL WHERE video_id = ? """, (status.value, datetime.now().isoformat(), video_id)) else: cursor.execute(""" UPDATE videos SET status = ?, error_message = ? WHERE video_id = ? """, (status.value, error_message, video_id)) conn.commit() def store_frame_analysis(self, video_id: str, analyses: List[FrameAnalysis]): """Store frame analysis results.""" with self._get_db() as conn: cursor = conn.cursor() for analysis in analyses: cursor.execute(""" INSERT OR REPLACE INTO frame_analysis ( video_id, frame_number, timestamp, frame_path, description, objects_detected, confidence ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( video_id, analysis.frame_number, analysis.timestamp, analysis.frame_path, analysis.description, json.dumps(analysis.objects_detected), analysis.confidence )) conn.commit() self.logger.debug(f"Stored {len(analyses)} frame analyses for video {video_id}") def store_transcript(self, video_id: str, transcript: str, language: str = "en"): """Store video transcript.""" with self._get_db() as conn: cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO transcripts (video_id, transcript, language) VALUES (?, ?, ?) """, (video_id, transcript, language)) conn.commit() # Also store in search index self._update_search_index(video_id, transcript, "transcript") def _update_search_index(self, video_id: str, content: str, content_type: str, timestamp_start: Optional[float] = None, timestamp_end: Optional[float] = None): """Update search index with content.""" with self._get_db() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO search_index ( video_id, content, content_type, timestamp_start, timestamp_end ) VALUES (?, ?, ?, ?, ?) """, (video_id, content, content_type, timestamp_start, timestamp_end)) conn.commit() def get_processing_result(self, video_id: str) -> Optional[ProcessingResult]: """Get processing result for a video.""" metadata = self.get_video_metadata(video_id) if not metadata: return None # Get frame analyses with self._get_db() as conn: cursor = conn.cursor() # Get processing time cursor.execute("SELECT processing_time FROM videos WHERE video_id = ?", (video_id,)) time_row = cursor.fetchone() processing_time = time_row['processing_time'] if time_row and time_row['processing_time'] else 0.0 # Get frames cursor.execute(""" SELECT * FROM frame_analysis WHERE video_id = ? ORDER BY frame_number """, (video_id,)) frames = [] for row in cursor.fetchall(): frames.append(FrameAnalysis( frame_number=row['frame_number'], timestamp=row['timestamp'], frame_path=row['frame_path'], description=row['description'], objects_detected=json.loads(row['objects_detected'] or '[]'), confidence=row['confidence'] or 0.0 )) # Get transcript cursor.execute("SELECT transcript FROM transcripts WHERE video_id = ?", (video_id,)) transcript_row = cursor.fetchone() transcript = transcript_row['transcript'] if transcript_row else None return ProcessingResult( video_id=video_id, status=ProcessingStatus.COMPLETED, frames_extracted=len(frames), frames_analyzed=len([f for f in frames if f.description]), transcript=transcript, timeline=frames, processing_time=processing_time ) def search_videos(self, query: str, limit: int = 10) -> List[Tuple[str, str, float]]: """Search videos by content.""" with self._get_db() as conn: cursor = conn.cursor() # Simple search - in production, use FTS5 or vector search cursor.execute(""" SELECT DISTINCT s.video_id, v.filename, COUNT(*) as match_count FROM search_index s JOIN videos v ON s.video_id = v.video_id WHERE s.content LIKE ? GROUP BY s.video_id ORDER BY match_count DESC LIMIT ? """, (f"%{query}%", limit)) return [(row['video_id'], row['filename'], row['match_count']) for row in cursor.fetchall()] def query_videos_by_location_and_time( self, location: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, content_query: Optional[str] = None, limit: int = 50, time_field: str = "recording_timestamp" ) -> List[VideoMetadata]: """Query videos by location and time range. Args: location: Location to filter by (None for all locations) start_time: Start of time range end_time: End of time range content_query: Optional content search query limit: Maximum results time_field: Time field to filter by ("recording_timestamp" or "created_at") Returns: List of VideoMetadata objects matching criteria """ with self._get_db() as conn: cursor = conn.cursor() # Build query query_parts = ["SELECT DISTINCT v.* FROM videos v"] conditions = ["v.status = ?"] params = [ProcessingStatus.COMPLETED.value] # Join with search index if content query provided if content_query: query_parts.append("JOIN search_index s ON v.video_id = s.video_id") conditions.append("s.content LIKE ?") params.append(f"%{content_query}%") # Location filter if location: normalized_location = self._normalize_location(location) conditions.append("v.location = ?") params.append(normalized_location) # Time range filter if start_time: conditions.append(f"v.{time_field} >= ?") params.append(start_time.isoformat()) if end_time: conditions.append(f"v.{time_field} <= ?") params.append(end_time.isoformat()) # Combine query if conditions: query_parts.append("WHERE " + " AND ".join(conditions)) query_parts.append("ORDER BY v.recording_timestamp DESC") query_parts.append("LIMIT ?") params.append(limit) query = " ".join(query_parts) cursor.execute(query, params) results = [] for row in cursor.fetchall(): results.append(VideoMetadata( video_id=row['video_id'], original_path=row['original_path'], filename=row['filename'], location=row['location'], recording_timestamp=datetime.fromisoformat(row['recording_timestamp']), duration=row['duration'], fps=row['fps'], width=row['width'], height=row['height'], codec=row['codec'], size_bytes=row['size_bytes'], created_at=datetime.fromisoformat(row['created_at']), processed_at=datetime.fromisoformat(row['processed_at']) if row['processed_at'] else None )) return results def cleanup_old_files(self, days: Optional[int] = None): """Clean up old processed files.""" days = days or self.config.storage.cleanup_after_days cutoff_date = datetime.now() - timedelta(days=days) with self._get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT video_id FROM videos WHERE created_at < ? AND status = ? """, (cutoff_date.isoformat(), ProcessingStatus.COMPLETED.value)) for row in cursor.fetchall(): video_id = row['video_id'] # Remove processed files processed_dir = self.base_path / "processed" / video_id if processed_dir.exists(): shutil.rmtree(processed_dir) self.logger.info(f"Cleaned up processed files for {video_id}") def get_storage_stats(self) -> Dict[str, Any]: """Get storage statistics.""" stats = { "total_videos": 0, "processed_videos": 0, "total_size_gb": 0.0, "processed_size_gb": 0.0 } # Count videos with self._get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) as count FROM videos") stats["total_videos"] = cursor.fetchone()['count'] cursor.execute(""" SELECT COUNT(*) as count FROM videos WHERE status = ? """, (ProcessingStatus.COMPLETED.value,)) stats["processed_videos"] = cursor.fetchone()['count'] # Calculate sizes for video_file in (self.base_path / "originals").glob("*"): if video_file.is_file(): stats["total_size_gb"] += video_file.stat().st_size / (1024**3) for processed_dir in (self.base_path / "processed").iterdir(): if processed_dir.is_dir(): for file in processed_dir.rglob("*"): if file.is_file(): stats["processed_size_gb"] += file.stat().st_size / (1024**3) return stats

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/michaelbaker-dev/mcpVideoParser'

If you have feedback or need assistance with the MCP directory API, please join our Discord server