Skip to main content
Glama
test_file_system.py16.4 kB
"""Test file system repository for audio file operations.""" from pathlib import Path from unittest.mock import MagicMock, patch import pytest from mcp_server_whisper.exceptions import AudioFileNotFoundError from mcp_server_whisper.infrastructure.file_system import FileSystemRepository class TestFileSystemRepository: """Test suite for FileSystemRepository.""" @pytest.fixture def audio_dir(self, tmp_path: Path) -> Path: """Create temporary audio directory.""" audio_path = tmp_path / "audio" audio_path.mkdir() return audio_path @pytest.fixture def repo(self, audio_dir: Path) -> FileSystemRepository: """Create FileSystemRepository instance.""" return FileSystemRepository(audio_dir) @pytest.mark.anyio async def test_get_audio_file_support_mp3(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test getting file support for MP3 file.""" test_file = audio_dir / "test.mp3" test_file.write_bytes(b"fake audio") mock_audio = MagicMock() mock_audio.__len__ = MagicMock(return_value=60000) # 60 seconds in milliseconds with patch("mcp_server_whisper.infrastructure.file_system.AudioSegment.from_file", return_value=mock_audio): result = await repo.get_audio_file_support(test_file) assert result.file_name == "test.mp3" assert result.format == "mp3" assert result.size_bytes == 10 # len(b"fake audio") assert result.transcription_support == ["whisper-1", "gpt-4o-transcribe", "gpt-4o-mini-transcribe"] assert result.chat_support and "gpt-4o-audio-preview" in result.chat_support[0] # MP3 supports chat assert result.duration_seconds == 60.0 @pytest.mark.anyio async def test_get_audio_file_support_wav(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test getting file support for WAV file.""" test_file = audio_dir / "test.wav" test_file.write_bytes(b"wav data") mock_audio = MagicMock() mock_audio.__len__ = MagicMock(return_value=30000) # 30 seconds with patch("mcp_server_whisper.infrastructure.file_system.AudioSegment.from_file", return_value=mock_audio): result = await repo.get_audio_file_support(test_file) assert result.format == "wav" assert result.transcription_support is not None assert result.chat_support is not None # WAV supports chat assert result.duration_seconds == 30.0 @pytest.mark.anyio async def test_get_audio_file_support_unsupported_format(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test that unsupported formats have no model support.""" test_file = audio_dir / "test.txt" test_file.write_bytes(b"not audio") with patch( "mcp_server_whisper.infrastructure.file_system.AudioSegment.from_file", side_effect=Exception("Invalid") ): result = await repo.get_audio_file_support(test_file) assert result.transcription_support is None assert result.chat_support is None assert result.duration_seconds is None # Failed to load @pytest.mark.anyio async def test_get_audio_file_support_mp4_no_chat(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test that MP4 supports transcription but not chat.""" test_file = audio_dir / "video.mp4" test_file.write_bytes(b"mp4 data") with patch( "mcp_server_whisper.infrastructure.file_system.AudioSegment.from_file", side_effect=Exception("Skip duration"), ): result = await repo.get_audio_file_support(test_file) assert result.format == "mp4" assert result.transcription_support is not None assert result.chat_support is None # MP4 doesn't support chat assert result.duration_seconds is None @pytest.mark.anyio async def test_get_audio_file_support_duration_extraction_fails_gracefully( self, repo: FileSystemRepository, audio_dir: Path ) -> None: """Test that duration extraction failure doesn't crash the function.""" test_file = audio_dir / "test.mp3" test_file.write_bytes(b"data") with patch( "mcp_server_whisper.infrastructure.file_system.AudioSegment.from_file", side_effect=Exception("Load failed") ): result = await repo.get_audio_file_support(test_file) # Should still return result, just without duration assert result.file_name == "test.mp3" assert result.duration_seconds is None @pytest.mark.anyio async def test_get_latest_audio_file_finds_most_recent(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test finding the most recently modified audio file.""" # Create files with different mtimes old_file = audio_dir / "old.mp3" old_file.write_bytes(b"old") import time time.sleep(0.01) new_file = audio_dir / "new.mp3" new_file.write_bytes(b"new") with patch("mcp_server_whisper.infrastructure.file_system.AudioSegment.from_file"): result = await repo.get_latest_audio_file() assert result.file_name == "new.mp3" @pytest.mark.anyio async def test_get_latest_audio_file_no_files_raises_error(self, repo: FileSystemRepository) -> None: """Test that AudioFileNotFoundError is raised when no audio files exist.""" with pytest.raises(AudioFileNotFoundError, match="No supported audio files found"): await repo.get_latest_audio_file() @pytest.mark.anyio async def test_get_latest_audio_file_skips_unsupported_formats( self, repo: FileSystemRepository, audio_dir: Path ) -> None: """Test that only supported formats are considered.""" # Create unsupported file txt_file = audio_dir / "notes.txt" txt_file.write_bytes(b"text") # Should raise error because txt is not supported with pytest.raises(AudioFileNotFoundError): await repo.get_latest_audio_file() @pytest.mark.anyio async def test_get_latest_audio_file_skips_directories(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test that directories are skipped.""" # Create a directory with audio extension (edge case) fake_dir = audio_dir / "fake.mp3" fake_dir.mkdir() # Should not find any files with pytest.raises(AudioFileNotFoundError): await repo.get_latest_audio_file() @pytest.mark.anyio async def test_list_audio_files_returns_all_supported(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test listing all supported audio files.""" # Create various formats (audio_dir / "file1.mp3").write_bytes(b"mp3") (audio_dir / "file2.wav").write_bytes(b"wav") (audio_dir / "file3.mp4").write_bytes(b"mp4") (audio_dir / "unsupported.txt").write_bytes(b"txt") result = await repo.list_audio_files() assert len(result) == 3 filenames = {p.name for p in result} assert filenames == {"file1.mp3", "file2.wav", "file3.mp4"} @pytest.mark.anyio async def test_list_audio_files_with_regex_pattern(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test filtering by regex pattern.""" (audio_dir / "interview_1.mp3").write_bytes(b"data") (audio_dir / "interview_2.mp3").write_bytes(b"data") (audio_dir / "music.mp3").write_bytes(b"data") result = await repo.list_audio_files(pattern="interview") assert len(result) == 2 assert all("interview" in p.name for p in result) @pytest.mark.anyio async def test_list_audio_files_with_format_filter(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test filtering by specific format.""" (audio_dir / "file1.mp3").write_bytes(b"mp3") (audio_dir / "file2.wav").write_bytes(b"wav") (audio_dir / "file3.mp3").write_bytes(b"mp3") result = await repo.list_audio_files(format_filter="mp3") assert len(result) == 2 assert all(p.suffix == ".mp3" for p in result) @pytest.mark.anyio async def test_list_audio_files_with_min_size(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test filtering by minimum file size.""" (audio_dir / "small.mp3").write_bytes(b"a" * 100) (audio_dir / "large.mp3").write_bytes(b"a" * 2000) result = await repo.list_audio_files(min_size_bytes=1000) assert len(result) == 1 assert result[0].name == "large.mp3" @pytest.mark.anyio async def test_list_audio_files_with_max_size(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test filtering by maximum file size.""" (audio_dir / "small.mp3").write_bytes(b"a" * 100) (audio_dir / "large.mp3").write_bytes(b"a" * 2000) result = await repo.list_audio_files(max_size_bytes=500) assert len(result) == 1 assert result[0].name == "small.mp3" @pytest.mark.anyio async def test_list_audio_files_with_size_range(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test filtering by size range.""" (audio_dir / "tiny.mp3").write_bytes(b"a" * 50) (audio_dir / "medium.mp3").write_bytes(b"a" * 500) (audio_dir / "large.mp3").write_bytes(b"a" * 2000) result = await repo.list_audio_files(min_size_bytes=100, max_size_bytes=1000) assert len(result) == 1 assert result[0].name == "medium.mp3" @pytest.mark.anyio async def test_list_audio_files_skips_directories(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test that directories are skipped in listing.""" (audio_dir / "file.mp3").write_bytes(b"data") (audio_dir / "subdir").mkdir() result = await repo.list_audio_files() assert len(result) == 1 assert result[0].name == "file.mp3" @pytest.mark.anyio async def test_list_audio_files_empty_directory(self, repo: FileSystemRepository) -> None: """Test listing in empty directory returns empty list.""" result = await repo.list_audio_files() assert result == [] @pytest.mark.anyio async def test_read_audio_file_success(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test successfully reading audio file.""" test_file = audio_dir / "test.mp3" test_content = b"audio content here" test_file.write_bytes(test_content) result = await repo.read_audio_file(test_file) assert result == test_content @pytest.mark.anyio async def test_read_audio_file_not_found(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test error when reading non-existent file.""" missing_file = audio_dir / "missing.mp3" with pytest.raises(AudioFileNotFoundError, match="File not found"): await repo.read_audio_file(missing_file) @pytest.mark.anyio async def test_read_audio_file_is_directory(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test error when trying to read a directory.""" directory = audio_dir / "subdir" directory.mkdir() with pytest.raises(AudioFileNotFoundError, match="File not found"): await repo.read_audio_file(directory) @pytest.mark.anyio async def test_write_audio_file_success(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test successfully writing audio file.""" output_file = audio_dir / "output.mp3" test_content = b"generated audio" await repo.write_audio_file(output_file, test_content) assert output_file.exists() assert output_file.read_bytes() == test_content @pytest.mark.anyio async def test_write_audio_file_creates_parent_directory(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test that write creates parent directories if needed.""" nested_dir = audio_dir / "subdir" / "nested" output_file = nested_dir / "output.mp3" test_content = b"audio" await repo.write_audio_file(output_file, test_content) assert output_file.exists() assert output_file.read_bytes() == test_content assert nested_dir.exists() @pytest.mark.anyio async def test_get_file_size_success(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test getting file size.""" test_file = audio_dir / "test.mp3" test_content = b"a" * 1234 test_file.write_bytes(test_content) result = await repo.get_file_size(test_file) assert result == 1234 @pytest.mark.anyio async def test_get_file_size_not_found(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test error when file doesn't exist.""" missing_file = audio_dir / "missing.mp3" with pytest.raises(AudioFileNotFoundError, match="File not found"): await repo.get_file_size(missing_file) @pytest.mark.anyio async def test_list_audio_files_combined_filters(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test using multiple filters together.""" (audio_dir / "interview_small.mp3").write_bytes(b"a" * 100) (audio_dir / "interview_large.mp3").write_bytes(b"a" * 2000) (audio_dir / "music_large.mp3").write_bytes(b"a" * 2000) (audio_dir / "interview.wav").write_bytes(b"a" * 1500) result = await repo.list_audio_files( pattern="interview", min_size_bytes=500, max_size_bytes=2500, format_filter="mp3" ) assert len(result) == 1 assert result[0].name == "interview_large.mp3" @pytest.mark.anyio async def test_list_audio_files_case_insensitive_format(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test that format filter is case insensitive.""" (audio_dir / "test.MP3").write_bytes(b"data") (audio_dir / "test.wav").write_bytes(b"data") # Filter for "MP3" (uppercase) result = await repo.list_audio_files(format_filter="MP3") assert len(result) == 1 # File extension is lowercase .mp3, but filter is case insensitive assert result[0].suffix.lower() == ".mp3" @pytest.mark.anyio async def test_get_audio_file_support_flac_format(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test FLAC file support (transcription only).""" test_file = audio_dir / "audio.flac" test_file.write_bytes(b"flac data") with patch( "mcp_server_whisper.infrastructure.file_system.AudioSegment.from_file", side_effect=Exception("Skip") ): result = await repo.get_audio_file_support(test_file) assert result.format == "flac" assert result.transcription_support is not None assert result.chat_support is None # FLAC doesn't support chat @pytest.mark.anyio async def test_get_audio_file_support_preserves_mtime(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test that modification time is correctly captured.""" test_file = audio_dir / "test.mp3" test_file.write_bytes(b"data") # Get the actual mtime expected_mtime = test_file.stat().st_mtime with patch( "mcp_server_whisper.infrastructure.file_system.AudioSegment.from_file", side_effect=Exception("Skip") ): result = await repo.get_audio_file_support(test_file) assert result.modified_time == expected_mtime @pytest.mark.anyio async def test_list_audio_files_regex_pattern_full_path(self, repo: FileSystemRepository, audio_dir: Path) -> None: """Test that regex pattern matches against full path.""" (audio_dir / "test.mp3").write_bytes(b"data") (audio_dir / "audio.mp3").write_bytes(b"data") # Pattern should match full path string result = await repo.list_audio_files(pattern=r".*test\.mp3$") assert len(result) == 1 assert result[0].name == "test.mp3"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/arcaputo3/mcp-server-whisper'

If you have feedback or need assistance with the MCP directory API, please join our Discord server