Skip to main content
Glama
test_video_processing_flow.py13.4 kB
"""Integration tests for complete video processing flow.""" import pytest import asyncio from pathlib import Path import tempfile import shutil from unittest.mock import patch, Mock from datetime import datetime from src.storage.manager import StorageManager from src.processors.video import VideoProcessor from src.llm.ollama_client import OllamaClient from src.storage.schemas import ProcessingStatus import pytest_asyncio @pytest_asyncio.fixture async def test_environment(): """Set up test environment with all components.""" # Create temporary directory temp_dir = tempfile.mkdtemp() # Mock config mock_config = Mock() mock_config.storage.base_path = temp_dir mock_config.storage.cleanup_after_days = 30 mock_config.processing.frame_sample_rate = 60 # Faster for testing mock_config.processing.enable_scene_detection = False mock_config.processing.max_frames_per_video = 10 mock_config.processing.frame_quality = 85 mock_config.processing.audio.enable_timestamps = True mock_config.processing.audio.model = "base" mock_config.processing.audio.language = "en" mock_config.performance.thread_count = 2 mock_config.llm.ollama_host = "http://localhost:11434" mock_config.llm.vision_model = "llava:latest" mock_config.llm.text_model = "llama2:latest" mock_config.llm.temperature = 0.7 mock_config.llm.timeout_seconds = 30 with patch('src.storage.manager.get_config', return_value=mock_config), \ patch('src.processors.video.get_config', return_value=mock_config), \ patch('src.llm.ollama_client.get_config', return_value=mock_config): # Initialize components storage = StorageManager(temp_dir) llm_client = OllamaClient() processor = VideoProcessor(storage, llm_client=llm_client) yield { 'storage': storage, 'processor': processor, 'llm_client': llm_client, 'temp_dir': temp_dir, 'config': mock_config } # Cleanup processor.cleanup() await llm_client.close() shutil.rmtree(temp_dir) @pytest.fixture def sample_video_path(): """Get path to sample video.""" # Check if sample video exists in originals or locations directory video_path = Path(__file__).parent.parent.parent / "video_data" / "originals" / "sample_video.mp4" if video_path.exists(): return video_path # Check in locations directory (for migrated videos) locations_base = Path(__file__).parent.parent.parent / "video_data" / "locations" if locations_base.exists(): # Search for sample_video.mp4 in locations subdirectories for video_file in locations_base.rglob("sample_video.mp4"): return video_file # Skip test if no sample video pytest.skip("Sample video not found. Download it first.") class TestVideoProcessingFlow: """Integration tests for video processing.""" @pytest.mark.asyncio async def test_complete_video_processing(self, test_environment, sample_video_path): """Test complete video processing workflow.""" storage = test_environment['storage'] processor = test_environment['processor'] # Store video metadata = await storage.store_video(str(sample_video_path)) assert metadata.video_id.startswith("vid_") # Verify video was copied to location-based structure # Extract date parts from recording timestamp date_parts = metadata.recording_timestamp.strftime("%Y/%m/%d") timestamp_str = metadata.recording_timestamp.strftime("%H%M%S") video_file = storage.base_path / "locations" / metadata.location / date_parts / f"{metadata.video_id}_{timestamp_str}.mp4" assert video_file.exists() # Mock whisper transcription with patch('whisper.load_model') as mock_load_model: mock_model = Mock() mock_model.transcribe.return_value = { 'text': 'This is a sample video with some audio content.' } mock_load_model.return_value = mock_model # Process video result = await processor.process_video(metadata.video_id) assert result.status == ProcessingStatus.COMPLETED assert result.frames_extracted > 0 assert result.frames_extracted <= test_environment['config'].processing.max_frames_per_video assert result.transcript is not None # Verify frames were extracted frames_dir = storage.base_path / "processed" / metadata.video_id / "frames" assert frames_dir.exists() frame_files = list(frames_dir.glob("*.jpg")) assert len(frame_files) == result.frames_extracted # Verify database was updated stored_result = storage.get_processing_result(metadata.video_id) assert stored_result is not None assert stored_result.transcript == result.transcript @pytest.mark.asyncio async def test_video_analysis_with_llm(self, test_environment, sample_video_path): """Test video analysis with LLM integration.""" storage = test_environment['storage'] processor = test_environment['processor'] llm_client = test_environment['llm_client'] # Check if Ollama is available if not await llm_client.is_available(): pytest.skip("Ollama is not running. Start it with 'ollama serve'") # Store and process video metadata = await storage.store_video(str(sample_video_path)) # Mock the frame analysis to avoid requiring LLaVA model from src.storage.schemas import FrameAnalysis with patch.object(processor, '_analyze_frames_placeholder') as mock_analyze: mock_analyze.return_value = [ FrameAnalysis( frame_number=0, timestamp=0.0, frame_path='frame_0.jpg', description='Animated characters in a forest scene', objects_detected=['trees', 'characters'], confidence=0.9 ), FrameAnalysis( frame_number=1, timestamp=1.0, frame_path='frame_1.jpg', description='Characters walking through the forest', objects_detected=['trees', 'path', 'characters'], confidence=0.85 ) ] # Process video result = await processor.process_video(metadata.video_id) assert result.status == ProcessingStatus.COMPLETED # Test Q&A functionality question = "What is happening in this video?" # Mock LLM response with patch.object(llm_client, 'answer_video_question') as mock_answer: mock_answer.return_value = "The video shows animated characters in a forest scene." answer = await llm_client.answer_video_question( question, "Frame descriptions show forest scenes with characters" ) assert answer is not None assert "forest" in answer.lower() @pytest.mark.asyncio async def test_search_functionality(self, test_environment): """Test video search functionality.""" storage = test_environment['storage'] # Create test videos with different content test_videos = [ ("video1", "A cat playing with a ball in the living room"), ("video2", "Dogs running in the park"), ("video3", "A cat and dog sleeping together") ] for video_id, transcript in test_videos: # Create minimal video metadata from src.storage.schemas import VideoMetadata metadata = VideoMetadata( video_id=video_id, original_path=f'/test/{video_id}.mp4', filename=f'{video_id}.mp4', location='test_location', recording_timestamp=datetime.now(), duration=10.0, fps=30.0, width=640, height=480, codec='h264', size_bytes=1000000 ) storage._store_video_metadata(metadata) # Store transcript storage.store_transcript(video_id, transcript) # Search for "cat" results = storage.search_videos("cat") assert len(results) == 2 video_ids = [r[0] for r in results] assert "video1" in video_ids assert "video3" in video_ids # Search for "park" results = storage.search_videos("park") assert len(results) == 1 assert results[0][0] == "video2" @pytest.mark.asyncio async def test_error_handling(self, test_environment): """Test error handling in video processing.""" storage = test_environment['storage'] processor = test_environment['processor'] # Try to process non-existent video with pytest.raises(ValueError): await processor.process_video("non_existent_id") # Create video metadata without actual file from src.storage.schemas import VideoMetadata fake_metadata = VideoMetadata( video_id='vid_fake123', original_path='/fake/video.mp4', filename='fake.mp4', location='test_location', recording_timestamp=datetime.now(), duration=10.0, fps=30.0, width=640, height=480, codec='h264', size_bytes=1000000 ) storage._store_video_metadata(fake_metadata) # Try to process - should fail and update status with pytest.raises(FileNotFoundError): await processor.process_video('vid_fake123') # Check status was updated with storage._get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT status, error_message FROM videos WHERE video_id = ?", ('vid_fake123',)) row = cursor.fetchone() assert row['status'] == ProcessingStatus.FAILED.value assert row['error_message'] is not None @pytest.mark.asyncio async def test_concurrent_processing(self, test_environment, sample_video_path): """Test processing multiple videos concurrently.""" storage = test_environment['storage'] processor = test_environment['processor'] # Limit concurrent processing for testing test_environment['config'].performance.max_concurrent_videos = 2 # Store multiple copies of the video video_ids = [] for i in range(3): metadata = await storage.store_video(str(sample_video_path)) video_ids.append(metadata.video_id) # Mock whisper with patch('whisper.load_model') as mock_load_model: mock_model = Mock() mock_model.transcribe.return_value = {'text': 'Test transcript'} mock_load_model.return_value = mock_model # Process videos concurrently tasks = [processor.process_video(vid) for vid in video_ids] results = await asyncio.gather(*tasks, return_exceptions=True) # Check all completed successfully for result in results: if isinstance(result, Exception): pytest.fail(f"Processing failed: {result}") assert result.status == ProcessingStatus.COMPLETED @pytest.mark.asyncio async def test_storage_cleanup(self, test_environment): """Test storage cleanup functionality.""" storage = test_environment['storage'] # Create old video from datetime import datetime, timedelta old_date = datetime.now() - timedelta(days=35) with storage._get_db() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO videos ( video_id, original_path, filename, location, recording_timestamp, duration, fps, width, height, codec, size_bytes, created_at, status ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( 'vid_old123', '/old/video.mp4', 'old.mp4', 'test_location', old_date.isoformat(), 10.0, 30.0, 640, 480, 'h264', 1000000, old_date.isoformat(), ProcessingStatus.COMPLETED.value )) conn.commit() # Create processed files old_dir = storage.base_path / "processed" / "vid_old123" old_dir.mkdir(parents=True) (old_dir / "test.txt").write_text("old data") # Run cleanup storage.cleanup_old_files(days=30) # Verify cleaned up assert not old_dir.exists()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/michaelbaker-dev/mcpVideoParser'

If you have feedback or need assistance with the MCP directory API, please join our Discord server