Skip to main content
Glama
extractor.py8.32 kB
# src/loom_agent/extractor.py """Frame extraction using ffmpeg scene detection.""" import re import uuid import subprocess from pathlib import Path from typing import Any class ExtractionError(Exception): """Error during frame extraction.""" pass class FrameExtractor: """Extract frames from video using ffmpeg scene detection.""" def __init__(self, output_base_dir: str): self.output_base_dir = Path(output_base_dir) def create_output_dir(self, video_identifier: str) -> Path: """Create a unique output directory for this extraction.""" # Use UUID to ensure uniqueness unique_id = f"{video_identifier}_{uuid.uuid4().hex[:8]}" output_dir = self.output_base_dir / unique_id output_dir.mkdir(parents=True, exist_ok=True) return output_dir def format_timestamp(self, seconds: float) -> str: """Format seconds as M:SS or MM:SS timestamp.""" minutes = int(seconds // 60) secs = int(seconds % 60) return f"{minutes}:{secs:02d}" def parse_showinfo_line(self, line: str) -> dict[str, Any] | None: """Parse a showinfo filter output line to extract pts_time.""" # Match pts_time:XX.XX pattern match = re.search(r'pts_time:\s*(\d+\.?\d*)', line) if match: return {"pts_time": float(match.group(1))} return None def apply_max_frames(self, frames: list[dict], max_frames: int) -> list[dict]: """Limit frames to max_frames, keeping evenly distributed selection.""" if len(frames) <= max_frames: return frames if max_frames <= 2: # Just return first and last return [frames[0], frames[-1]][:max_frames] # Always include first and last, distribute rest evenly result = [frames[0]] # Calculate step for middle frames middle_count = max_frames - 2 step = (len(frames) - 2) / (middle_count + 1) for i in range(1, middle_count + 1): idx = int(i * step) result.append(frames[idx]) result.append(frames[-1]) return result def get_video_duration(self, video_path: str) -> float: """Get video duration in seconds using ffprobe.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: raise ExtractionError(f"Could not probe video: {result.stderr}") return float(result.stdout.strip()) except (ValueError, subprocess.TimeoutExpired) as e: raise ExtractionError(f"Error getting video duration: {e}") def extract_frames( self, video_path: str, output_dir: Path, threshold: float = 0.3, max_frames: int = 20, timeout: int = 120 ) -> list[dict]: """ Extract frames at scene changes using ffmpeg. Args: video_path: Path to video file output_dir: Directory to save frames threshold: Scene change threshold (0.0-1.0) max_frames: Maximum frames to extract timeout: Extraction timeout in seconds Returns: List of frame info dicts with path, timestamp, scene_score """ output_pattern = str(output_dir / "frame_%03d.png") # ffmpeg command with scene detection and showinfo cmd = [ "ffmpeg", "-i", video_path, "-vf", f"select='gt(scene,{threshold})',showinfo", "-vsync", "vfn", output_pattern, "-y" # Overwrite output files ] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) # ffmpeg outputs to stderr stderr = result.stderr except subprocess.TimeoutExpired: raise ExtractionError(f"Frame extraction timed out after {timeout} seconds") except FileNotFoundError: raise ExtractionError("ffmpeg not found. Check Docker container.") # Parse showinfo output to get timestamps timestamps = [] for line in stderr.split('\n'): if 'showinfo' in line.lower() or 'pts_time' in line: parsed = self.parse_showinfo_line(line) if parsed: timestamps.append(parsed["pts_time"]) # Find all extracted frame files frame_files = sorted(output_dir.glob("frame_*.png")) if len(frame_files) < 3: # Scene detection found too few frames - use interval-based extraction # This is common for screen recordings where changes are subtle duration = self.get_video_duration(video_path) return self._extract_frames_by_interval( video_path, output_dir, duration, max_frames ) # Build frame info list frames = [] for i, frame_path in enumerate(frame_files): timestamp = timestamps[i] if i < len(timestamps) else 0.0 # Calculate duration until next frame if i < len(frame_files) - 1 and i + 1 < len(timestamps): duration = timestamps[i + 1] - timestamp duration_str = self.format_timestamp(duration) else: duration_str = None frames.append({ "path": str(frame_path), "timestamp": self.format_timestamp(timestamp), "scene_score": threshold, # Simplified: actual score parsing would need more complex ffmpeg output "duration_until_next": duration_str }) # Apply max frames limit return self.apply_max_frames(frames, max_frames) def _extract_single_frame(self, video_path: str, output_path: Path) -> None: """Extract a single frame from the start of the video.""" cmd = [ "ffmpeg", "-i", video_path, "-vframes", "1", str(output_path), "-y" ] subprocess.run(cmd, capture_output=True, timeout=30) def _extract_frames_by_interval( self, video_path: str, output_dir: Path, duration: float, max_frames: int ) -> list[dict]: """ Extract frames at regular intervals throughout the video. Fallback when scene detection doesn't find enough frames. """ # Clean up any existing frames from failed scene detection for old_frame in output_dir.glob("frame_*.png"): old_frame.unlink() # Calculate interval - aim for max_frames evenly distributed # Minimum 2 seconds between frames, maximum based on max_frames num_frames = min(max_frames, max(3, int(duration / 2))) interval = duration / (num_frames + 1) # Extract frames at calculated timestamps using fps filter fps_value = 1 / interval if interval > 0 else 1 output_pattern = str(output_dir / "frame_%03d.png") cmd = [ "ffmpeg", "-i", video_path, "-vf", f"fps=1/{interval:.2f}", output_pattern, "-y" ] subprocess.run(cmd, capture_output=True, text=True, timeout=120) # Build frame info list frame_files = sorted(output_dir.glob("frame_*.png")) frames = [] for i, frame_path in enumerate(frame_files): timestamp = i * interval # Calculate duration until next frame if i < len(frame_files) - 1: duration_until_next = self.format_timestamp(interval) else: duration_until_next = None frames.append({ "path": str(frame_path), "timestamp": self.format_timestamp(timestamp), "scene_score": 0.0, # Interval-based, not scene-based "duration_until_next": duration_until_next }) return self.apply_max_frames(frames, max_frames)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Slaycaster/loom-local-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server