Skip to main content
Glama
test_video_processor.py13.6 kB
"""Unit tests for video processor.""" import pytest import tempfile import shutil from pathlib import Path from unittest.mock import Mock, patch, AsyncMock, MagicMock import cv2 import numpy as np from src.processors.video import VideoProcessor from src.storage.schemas import VideoMetadata, ProcessingStatus @pytest.fixture def mock_storage(): """Create a mock storage manager.""" storage = Mock() storage.base_path = Path(tempfile.mkdtemp()) yield storage shutil.rmtree(storage.base_path) @pytest.fixture def mock_config(): """Create mock configuration.""" config = Mock() config.processing.frame_sample_rate = 30 config.processing.enable_scene_detection = False config.processing.scene_threshold = 0.3 config.processing.max_frames_per_video = 100 config.processing.frame_quality = 85 config.processing.frame_format = "jpg" config.processing.audio.enable_timestamps = True config.processing.audio.model = "base" config.processing.audio.language = "en" config.performance.thread_count = 4 return config @pytest.fixture def video_processor(mock_storage, mock_config): """Create a video processor instance.""" with patch('src.processors.video.get_config', return_value=mock_config): processor = VideoProcessor(mock_storage, llm_client=None) yield processor processor.cleanup() @pytest.fixture def sample_video_metadata(): """Create sample video metadata.""" from datetime import datetime return VideoMetadata( video_id="vid_test123", original_path="/test/video.mp4", filename="test_video.mp4", location="test_location", recording_timestamp=datetime.now(), duration=10.0, fps=30.0, width=640, height=480, codec="h264", size_bytes=1000000 ) def create_test_video(path: Path, duration_seconds: int = 1, fps: int = 30): """Create a simple test video file.""" width, height = 640, 480 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(str(path), fourcc, fps, (width, height)) # Generate frames for i in range(fps * duration_seconds): # Create a frame with changing color frame = np.zeros((height, width, 3), dtype=np.uint8) color_value = int((i / (fps * duration_seconds)) * 255) frame[:, :] = (color_value, 255 - color_value, 128) # Add frame number text cv2.putText(frame, f"Frame {i}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) out.write(frame) out.release() class TestVideoProcessor: """Test cases for VideoProcessor.""" def test_init(self, mock_storage, mock_config): """Test video processor initialization.""" with patch('src.processors.video.get_config', return_value=mock_config): processor = VideoProcessor(mock_storage) assert processor.storage == mock_storage assert processor.config == mock_config processor.cleanup() def test_get_video_path(self, video_processor, mock_storage): """Test getting video file path.""" from datetime import datetime from src.storage.schemas import VideoMetadata video_id = "vid_test123" # Create mock metadata metadata = VideoMetadata( video_id=video_id, original_path="/test/video.mp4", filename="test_video.mp4", location="test_location", recording_timestamp=datetime(2025, 6, 12, 10, 30, 0), duration=10.0, fps=30.0, width=640, height=480, codec="h264", size_bytes=1000000 ) mock_storage.get_video_metadata.return_value = metadata # Create test video file in location-based structure date_parts = metadata.recording_timestamp.strftime("%Y/%m/%d") timestamp_str = metadata.recording_timestamp.strftime("%H%M%S") video_path = mock_storage.base_path / "locations" / metadata.location / date_parts / f"{video_id}_{timestamp_str}.mp4" video_path.parent.mkdir(parents=True, exist_ok=True) video_path.touch() found_path = video_processor._get_video_path(video_id) # The method should try various filename patterns assert found_path.parent == video_path.parent assert found_path.name.startswith(video_id) def test_get_video_path_not_found(self, video_processor, mock_storage): """Test getting video path when file doesn't exist.""" # Mock get_video_metadata to return None for nonexistent video mock_storage.get_video_metadata.return_value = None with pytest.raises(FileNotFoundError): video_processor._get_video_path("nonexistent_id") @pytest.mark.asyncio async def test_extract_frames(self, video_processor, mock_storage): """Test frame extraction from video.""" # Create test video video_path = mock_storage.base_path / "test_video.mp4" create_test_video(video_path, duration_seconds=1, fps=30) # Create output directory output_dir = mock_storage.base_path / "processed" / "test" output_dir.mkdir(parents=True, exist_ok=True) # Extract frames frame_paths = await video_processor._extract_frames(video_path, output_dir) # Check frames were extracted assert len(frame_paths) > 0 assert all(p.exists() for p in frame_paths) # Check frame directory structure frames_dir = output_dir / "frames" assert frames_dir.exists() assert len(list(frames_dir.glob("*.jpg"))) == len(frame_paths) @pytest.mark.asyncio async def test_extract_frames_with_scene_detection(self, video_processor, mock_storage, mock_config): """Test frame extraction with scene detection enabled.""" mock_config.processing.enable_scene_detection = True # Create test video video_path = mock_storage.base_path / "test_video.mp4" create_test_video(video_path, duration_seconds=2, fps=30) # Create output directory output_dir = mock_storage.base_path / "processed" / "test" output_dir.mkdir(parents=True, exist_ok=True) # Extract frames frame_paths = await video_processor._extract_frames(video_path, output_dir) # Should have extracted some frames assert len(frame_paths) > 0 assert len(frame_paths) <= mock_config.processing.max_frames_per_video @pytest.mark.asyncio async def test_detect_scene_changes(self, video_processor, mock_storage): """Test scene change detection.""" # Create test video with scene changes video_path = mock_storage.base_path / "test_video.mp4" width, height = 640, 480 fps = 30 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(str(video_path), fourcc, fps, (width, height)) # Create frames with distinct scenes total_frames = 90 for i in range(total_frames): frame = np.zeros((height, width, 3), dtype=np.uint8) # Change color every 30 frames (1 second) if i < 30: frame[:, :] = (255, 0, 0) # Red elif i < 60: frame[:, :] = (0, 255, 0) # Green else: frame[:, :] = (0, 0, 255) # Blue out.write(frame) out.release() # Detect scene changes scene_frames = await video_processor._detect_scene_changes(video_path, total_frames) # Should detect at least the beginning and scene changes assert len(scene_frames) >= 3 assert 0 in scene_frames # First frame assert (total_frames - 1) in scene_frames # Last frame @pytest.mark.asyncio async def test_extract_and_transcribe_audio(self, video_processor, mock_storage): """Test audio extraction and transcription.""" # This test requires ffmpeg and whisper to be installed # Mock the whisper transcription with patch('whisper.load_model') as mock_load_model: mock_model = Mock() mock_model.transcribe.return_value = { 'text': 'This is a test transcription.' } mock_load_model.return_value = mock_model # Create a simple video with audio (mocked) video_path = mock_storage.base_path / "test_video.mp4" video_path.touch() output_dir = mock_storage.base_path / "processed" / "test" output_dir.mkdir(parents=True, exist_ok=True) # Mock ffmpeg subprocess mock_process = AsyncMock() mock_process.returncode = 0 mock_process.communicate.return_value = (b'', b'') with patch('asyncio.create_subprocess_exec', return_value=mock_process): # Create dummy audio file that ffmpeg would create audio_path = output_dir / "audio.wav" audio_path.write_bytes(b'dummy audio data') transcript = await video_processor._extract_and_transcribe_audio( video_path, output_dir ) assert transcript == 'This is a test transcription.' assert not audio_path.exists() # Should be cleaned up @pytest.mark.asyncio async def test_analyze_frames_placeholder(self, video_processor, mock_storage): """Test placeholder frame analysis.""" # Create test frame paths frames_dir = mock_storage.base_path / "frames" frames_dir.mkdir(parents=True, exist_ok=True) frame_paths = [] for i in range(3): timestamp = i * 1.0 frame_path = frames_dir / f"frame_{i:05d}_t{timestamp:.1f}s.jpg" frame_path.touch() frame_paths.append(frame_path) # Analyze frames analyses = await video_processor._analyze_frames_placeholder( frame_paths, "vid_test123" ) assert len(analyses) == 3 for i, analysis in enumerate(analyses): assert analysis.frame_number == i assert analysis.timestamp == i * 1.0 assert analysis.confidence == 1.0 @pytest.mark.asyncio async def test_process_video_complete(self, video_processor, mock_storage, sample_video_metadata): """Test complete video processing workflow.""" video_id = sample_video_metadata.video_id # Setup mocks mock_storage.get_video_metadata.return_value = sample_video_metadata mock_storage.update_video_status = Mock() mock_storage.store_transcript = Mock() mock_storage.store_frame_analysis = Mock() # Create test video in location-based structure date_parts = sample_video_metadata.recording_timestamp.strftime("%Y/%m/%d") timestamp_str = sample_video_metadata.recording_timestamp.strftime("%H%M%S") video_dir = mock_storage.base_path / "locations" / sample_video_metadata.location / date_parts video_dir.mkdir(parents=True, exist_ok=True) video_path = video_dir / f"{video_id}_{timestamp_str}.mp4" create_test_video(video_path, duration_seconds=1, fps=30) # Mock audio transcription with patch.object(video_processor, '_extract_and_transcribe_audio') as mock_transcribe: mock_transcribe.return_value = "Test transcript" # Process video result = await video_processor.process_video(video_id) # Verify result assert result.video_id == video_id assert result.status == ProcessingStatus.COMPLETED assert result.frames_extracted > 0 assert result.transcript == "Test transcript" # Verify storage calls assert mock_storage.update_video_status.call_count >= 2 mock_storage.store_transcript.assert_called_once_with( video_id, "Test transcript" ) mock_storage.store_frame_analysis.assert_called_once() @pytest.mark.asyncio async def test_process_video_error_handling(self, video_processor, mock_storage, sample_video_metadata): """Test error handling during video processing.""" video_id = sample_video_metadata.video_id # Setup mocks mock_storage.get_video_metadata.return_value = sample_video_metadata mock_storage.update_video_status = Mock() # Don't create video file to trigger error # Process video - should raise exception with pytest.raises(FileNotFoundError): await video_processor.process_video(video_id) # Verify error status was set # Check that the failed status was called calls = mock_storage.update_video_status.call_args_list failed_call_found = False for call in calls: if len(call[0]) >= 2 and call[0][1] == ProcessingStatus.FAILED: failed_call_found = True break assert failed_call_found, "ProcessingStatus.FAILED was not set"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/michaelbaker-dev/mcpVideoParser'

If you have feedback or need assistance with the MCP directory API, please join our Discord server