Skip to main content
Glama
test_step3_ingestion.py•30.7 kB
#!/usr/bin/env python3 """ Step 3 Tests: Complete Markdown Ingestion Pipeline Test the three main ingestion tools: - process_markdown_file: Single file processing - batch_process_markdown_files: Multi-file processing - batch_process_directory: Directory processing """ import os import pytest import tempfile import shutil from unittest.mock import Mock, AsyncMock from src.tool_handlers import ToolHandlers from src.memory_manager import QdrantMemoryManager from src.markdown_processor import MarkdownProcessor from src.mcp_server import MemoryMCPServer class TestStep3Ingestion: """Test Step 3: Complete Markdown Ingestion Pipeline""" @pytest.fixture(autouse=True) def setup(self) -> None: """Set up test environment""" # Create temp directory for test files self.temp_dir = tempfile.mkdtemp() self.test_files = [] # Mock dependencies self.mock_memory_manager = Mock(spec=QdrantMemoryManager) self.mock_markdown_processor = Mock(spec=MarkdownProcessor) # Configure mock memory manager self.mock_memory_manager.add_file_metadata = AsyncMock( return_value=True ) self.mock_memory_manager.get_file_metadata = AsyncMock( return_value=None ) self.mock_memory_manager.check_file_processed = AsyncMock( return_value=False ) self.mock_memory_manager.validate_and_deduplicate = AsyncMock( return_value={ 'success': True, 'processed': 5, 'duplicates_removed': 1, 'near_misses': 2 } ) # Configure mock markdown processor self.mock_markdown_processor.analyze_content = AsyncMock( return_value={'memory_type': 'learned', 'confidence': 0.8} ) self.mock_markdown_processor.optimize_content = AsyncMock( return_value="Optimized content" ) self.mock_markdown_processor.chunk_content = AsyncMock( return_value=["chunk1", "chunk2", "chunk3"] ) self.mock_memory_manager.store_memory = AsyncMock( return_value={'success': True, 'stored': 3} ) # Create tool handlers with mocked dependencies self.tool_handlers = ToolHandlers( self.mock_memory_manager, self.mock_markdown_processor ) def teardown_method(self) -> None: """Clean up test environment""" shutil.rmtree(self.temp_dir) def create_test_file( self, name: str, content: str = "# Test Content\nThis is test content." ) -> str: """Create a test markdown file""" file_path = os.path.join(self.temp_dir, name) with open(file_path, 'w', encoding='utf-8') as f: f.write(content) self.test_files.append(file_path) return file_path # Test 1: Single File Processing async def test_process_markdown_file_success(self) -> None: """Test successful single file processing""" file_path = self.create_test_file("test.md") result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'memory_type': 'learned', 'auto_suggest': False }) assert result['success'] is True assert result['file'] == file_path assert result['memory_type'] == 'learned' assert 'chunks_stored' in result assert 'deduplication_stats' in result # Verify calls to dependencies self.mock_memory_manager.add_file_metadata.assert_called_once() self.mock_markdown_processor.optimize_content.assert_called_once() self.mock_markdown_processor.chunk_content.assert_called_once() self.mock_memory_manager.store_memory.assert_called_once() self.mock_memory_manager.validate_and_deduplicate.assert_called_once() async def test_process_markdown_file_not_found(self) -> None: """Test handling of non-existent files""" result = await self.tool_handlers.handle_process_markdown_file({ 'path': '/nonexistent/file.md', 'memory_type': 'learned' }) assert result['success'] is False assert 'not found' in result['error'].lower() # Test 2: Batch File Processing async def test_batch_process_markdown_files_success(self) -> None: """Test successful batch file processing""" file1 = self.create_test_file("file1.md", "# File 1\nContent 1") file2 = self.create_test_file("file2.md", "# File 2\nContent 2") result = await self.tool_handlers.handle_batch_process_markdown_files({ 'file_assignments': [ {'path': file1, 'memory_type': 'global'}, {'path': file2, 'memory_type': 'learned'} ] }) assert result['success'] is True assert result['total_files'] == 2 assert result['processed_files'] == 2 assert result['failed_files'] == 0 assert len(result['results']) == 2 # Check individual file results file_results = {r['file']: r for r in result['results']} assert file_results[file1]['memory_type'] == 'global' assert file_results[file2]['memory_type'] == 'learned' async def test_batch_process_empty_list(self) -> None: """Test batch processing with empty file list""" result = await self.tool_handlers.handle_batch_process_markdown_files({ 'file_assignments': [] }) assert result['success'] is True assert result['total_files'] == 0 assert result['processed_files'] == 0 # Test 3: Directory Processing async def test_batch_process_directory_success(self) -> None: """Test successful directory processing""" # Create test directory structure sub_dir = os.path.join(self.temp_dir, "subdir") os.makedirs(sub_dir) # Create files in root and subdirectory self.create_test_file("root_file.md", "# Root File\nRoot content") self.create_test_file( os.path.join("subdir", "sub_file.md"), "# Sub File\nSub content" ) # Create non-markdown file (should be ignored) non_md_path = os.path.join(self.temp_dir, "text_file.txt") with open(non_md_path, 'w') as f: f.write("Not markdown") result = await self.tool_handlers.handle_batch_process_directory({ 'directory': self.temp_dir, 'memory_type': 'global', 'recursive': True }) assert result['success'] is True assert result['directory'] == self.temp_dir assert result['files_found'] == 2 # Only .md files assert result['processed_files'] == 2 assert result['failed_files'] == 0 # Verify all markdown files were processed processed_files = [r['file'] for r in result['results']] assert any('root_file.md' in f for f in processed_files) assert any('sub_file.md' in f for f in processed_files) async def test_batch_process_directory_not_found(self) -> None: """Test handling of non-existent directory""" result = await self.tool_handlers.handle_batch_process_directory({ 'directory': '/nonexistent/directory', 'memory_type': 'global' }) assert result['success'] is False assert 'not found' in result['error'].lower() # Test 4: MCP Server Integration async def test_mcp_server_tool_registration(self) -> None: """Test MCP server has ingestion tools registered""" mock_handlers = Mock(spec=ToolHandlers) server = MemoryMCPServer(mock_handlers) # Get available tools tools = [tool['name'] for tool in server.get_available_tools()] # Verify all Step 3 tools are registered expected_tools = [ 'process_markdown_file', 'batch_process_markdown_files', 'batch_process_directory' ] for tool_name in expected_tools: assert tool_name in tools # Test 5: File Metadata Integration async def test_file_metadata_tracking(self) -> None: """Test file metadata is properly tracked""" file_path = self.create_test_file("test.md") # Mock metadata responses expected_metadata = { 'path': file_path, 'hash': 'mock_hash', 'processed_at': '2024-01-01T00:00:00', 'memory_type': 'learned', 'chunks_stored': 3 } self.mock_memory_manager.add_file_metadata.return_value = expected_metadata result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'memory_type': 'learned' }) assert result['success'] is True # Verify metadata was added self.mock_memory_manager.add_file_metadata.assert_called_once() call_args = self.mock_memory_manager.add_file_metadata.call_args[1] assert call_args['path'] == file_path assert call_args['memory_type'] == 'learned' if __name__ == '__main__': pytest.main([__file__, '-v']) import os import pytest import tempfile import shutil from unittest.mock import Mock, AsyncMock from src.tool_handlers import ToolHandlers from src.memory_manager import MemoryManager from src.markdown_processor import MarkdownProcessor from src.mcp_server import MCPServer class TestStep3Ingestion: """Test Step 3: Complete Markdown Ingestion Pipeline""" @pytest.fixture(autouse=True) async def setup(self): """Set up test environment""" # Create temp directory for test files self.temp_dir = tempfile.mkdtemp() self.test_files = [] # Mock dependencies self.mock_memory_manager = Mock(spec=MemoryManager) self.mock_markdown_processor = Mock(spec=MarkdownProcessor) # Configure mock memory manager self.mock_memory_manager.add_file_metadata = AsyncMock( return_value=True ) self.mock_memory_manager.get_file_metadata = AsyncMock( return_value=None ) self.mock_memory_manager.check_file_processed = AsyncMock( return_value=False ) self.mock_memory_manager.validate_and_deduplicate = AsyncMock( return_value={ 'success': True, 'processed': 5, 'duplicates_removed': 1, 'near_misses': 2 } ) # Configure mock markdown processor self.mock_markdown_processor.analyze_content = AsyncMock( return_value={'memory_type': 'learned', 'confidence': 0.8} ) self.mock_markdown_processor.optimize_content = AsyncMock( return_value="Optimized content" ) self.mock_markdown_processor.chunk_content = AsyncMock( return_value=["chunk1", "chunk2", "chunk3"] ) self.mock_memory_manager.store_memory = AsyncMock( return_value={'success': True, 'stored': 3} ) # Create tool handlers with mocked dependencies self.tool_handlers = ToolHandlers( self.mock_memory_manager, self.mock_markdown_processor ) yield # Cleanup shutil.rmtree(self.temp_dir) def create_test_file( self, name: str, content: str = "# Test Content\nThis is test content." ) -> str: """Create a test markdown file""" file_path = os.path.join(self.temp_dir, name) with open(file_path, 'w', encoding='utf-8') as f: f.write(content) self.test_files.append(file_path) return file_path def create_test_directory_structure(self) -> str: """Create nested directory with markdown files""" # Create subdirectory sub_dir = os.path.join(self.temp_dir, "subdir") os.makedirs(sub_dir) # Create files in root and subdirectory self.create_test_file("root_file.md", "# Root File\nRoot content") self.create_test_file( os.path.join("subdir", "sub_file.md"), "# Sub File\nSub content" ) # Create non-markdown file (should be ignored) non_md_path = os.path.join(self.temp_dir, "text_file.txt") with open(non_md_path, 'w') as f: f.write("Not markdown") return self.temp_dir # Test 1: Single File Processing async def test_process_markdown_file_success(self): """Test successful single file processing""" file_path = self.create_test_file("test.md") result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'memory_type': 'learned', 'auto_suggest': False }) assert result['success'] is True assert result['file'] == file_path assert result['memory_type'] == 'learned' assert 'chunks_stored' in result assert 'deduplication_stats' in result # Verify calls to dependencies self.mock_memory_manager.add_file_metadata.assert_called_once() self.mock_markdown_processor.optimize_content.assert_called_once() self.mock_markdown_processor.chunk_content.assert_called_once() self.mock_memory_manager.store_memory.assert_called_once() self.mock_memory_manager.validate_and_deduplicate.assert_called_once() async def test_process_markdown_file_auto_suggest(self): """Test file processing with auto memory type suggestion""" file_path = self.create_test_file("test.md") result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'auto_suggest': True }) assert result['success'] is True assert result['memory_type'] == 'learned' # From mock analyzer # Verify analysis was called self.mock_markdown_processor.analyze_content.assert_called_once() async def test_process_markdown_file_already_processed(self): """Test handling of already processed files""" file_path = self.create_test_file("test.md") # Mock file as already processed self.mock_memory_manager.check_file_processed.return_value = True self.mock_memory_manager.get_file_metadata.return_value = { 'processed_at': '2024-01-01T00:00:00', 'memory_type': 'global' } result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'memory_type': 'learned' }) assert result['success'] is True assert 'already_processed' in result['message'] assert result['memory_type'] == 'global' # From existing metadata async def test_process_markdown_file_not_found(self): """Test handling of non-existent files""" result = await self.tool_handlers.handle_process_markdown_file({ 'path': '/nonexistent/file.md', 'memory_type': 'learned' }) assert result['success'] is False assert 'not found' in result['error'].lower() async def test_process_markdown_file_processing_error(self): """Test handling of processing errors""" file_path = self.create_test_file("test.md") # Mock processing error self.mock_markdown_processor.optimize_content.side_effect = Exception( "Processing failed" ) result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'memory_type': 'learned' }) assert result['success'] is False assert 'processing failed' in result['error'].lower() # Test 2: Batch File Processing async def test_batch_process_markdown_files_success(self): """Test successful batch file processing""" file1 = self.create_test_file("file1.md", "# File 1\nContent 1") file2 = self.create_test_file("file2.md", "# File 2\nContent 2") result = await self.tool_handlers.handle_batch_process_markdown_files({ 'file_assignments': [ {'path': file1, 'memory_type': 'global'}, {'path': file2, 'memory_type': 'learned'} ], 'default_memory_type': 'agent' }) assert result['success'] is True assert result['total_files'] == 2 assert result['processed_files'] == 2 assert result['failed_files'] == 0 assert len(result['results']) == 2 # Check individual file results file_results = {r['file']: r for r in result['results']} assert file_results[file1]['memory_type'] == 'global' assert file_results[file2]['memory_type'] == 'learned' async def test_batch_process_mixed_success_failure(self): """Test batch processing with some failures""" file1 = self.create_test_file("file1.md") file2_path = "/nonexistent/file2.md" result = await self.tool_handlers.handle_batch_process_markdown_files({ 'file_assignments': [ {'path': file1, 'memory_type': 'global'}, {'path': file2_path, 'memory_type': 'learned'} ] }) assert result['success'] is True # Overall success even with failures assert result['total_files'] == 2 assert result['processed_files'] == 1 assert result['failed_files'] == 1 # Check results results = result['results'] success_results = [r for r in results if r['success']] failed_results = [r for r in results if not r['success']] assert len(success_results) == 1 assert len(failed_results) == 1 assert success_results[0]['file'] == file1 assert failed_results[0]['file'] == file2_path async def test_batch_process_with_default_memory_type(self): """Test batch processing using default memory type""" file1 = self.create_test_file("file1.md") file2 = self.create_test_file("file2.md") result = await self.tool_handlers.handle_batch_process_markdown_files({ 'file_assignments': [ {'path': file1}, # No memory_type specified {'path': file2, 'memory_type': 'learned'} ], 'default_memory_type': 'global' }) assert result['success'] is True # Check that first file used default memory type file_results = {r['file']: r for r in result['results']} assert file_results[file1]['memory_type'] == 'global' # Default assert file_results[file2]['memory_type'] == 'learned' # Specified async def test_batch_process_empty_list(self): """Test batch processing with empty file list""" result = await self.tool_handlers.handle_batch_process_markdown_files({ 'file_assignments': [] }) assert result['success'] is True assert result['total_files'] == 0 assert result['processed_files'] == 0 # Test 3: Directory Processing async def test_batch_process_directory_success(self): """Test successful directory processing""" self.create_test_directory_structure() result = await self.tool_handlers.handle_batch_process_directory({ 'directory': self.temp_dir, 'memory_type': 'global', 'recursive': True }) assert result['success'] is True assert result['directory'] == self.temp_dir assert result['files_found'] == 2 # Only .md files assert result['processed_files'] == 2 assert result['failed_files'] == 0 # Verify all markdown files were processed processed_files = [r['file'] for r in result['results']] assert any('root_file.md' in f for f in processed_files) assert any('sub_file.md' in f for f in processed_files) async def test_batch_process_directory_non_recursive(self): """Test directory processing without recursion""" self.create_test_directory_structure() result = await self.tool_handlers.handle_batch_process_directory({ 'directory': self.temp_dir, 'memory_type': 'global', 'recursive': False }) assert result['success'] is True assert result['files_found'] == 1 # Only root level .md file # Verify only root file was processed processed_files = [r['file'] for r in result['results']] assert any('root_file.md' in f for f in processed_files) assert not any('sub_file.md' in f for f in processed_files) async def test_batch_process_directory_with_auto_suggest(self): """Test directory processing with memory type auto-suggestion""" self.create_test_directory_structure() result = await self.tool_handlers.handle_batch_process_directory({ 'directory': self.temp_dir, 'recursive': True # No memory_type specified, should use auto-suggestion }) assert result['success'] is True # Check that files used suggested memory type for file_result in result['results']: assert file_result['memory_type'] == 'learned' # From mock async def test_batch_process_directory_not_found(self): """Test handling of non-existent directory""" result = await self.tool_handlers.handle_batch_process_directory({ 'directory': '/nonexistent/directory', 'memory_type': 'global' }) assert result['success'] is False assert 'not found' in result['error'].lower() async def test_batch_process_directory_no_markdown_files(self): """Test directory with no markdown files""" # Create directory with only non-markdown files empty_dir = os.path.join(self.temp_dir, "empty") os.makedirs(empty_dir) with open(os.path.join(empty_dir, "text.txt"), 'w') as f: f.write("Not markdown") result = await self.tool_handlers.handle_batch_process_directory({ 'directory': empty_dir, 'memory_type': 'global' }) assert result['success'] is True assert result['files_found'] == 0 assert len(result['results']) == 0 # Test 4: File Metadata Integration async def test_file_metadata_tracking(self): """Test file metadata is properly tracked""" file_path = self.create_test_file("test.md") # Mock metadata responses expected_metadata = { 'path': file_path, 'hash': 'mock_hash', 'processed_at': '2024-01-01T00:00:00', 'memory_type': 'learned', 'chunks_stored': 3 } self.mock_memory_manager.add_file_metadata.return_value = expected_metadata result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'memory_type': 'learned' }) assert result['success'] is True # Verify metadata was added self.mock_memory_manager.add_file_metadata.assert_called_once() call_args = self.mock_memory_manager.add_file_metadata.call_args[1] assert call_args['path'] == file_path assert call_args['memory_type'] == 'learned' # Test 5: Progress Tracking and Reporting async def test_progress_tracking(self): """Test progress tracking in batch operations""" files = [ self.create_test_file(f"file{i}.md", f"# File {i}\nContent {i}") for i in range(5) ] result = await self.tool_handlers.handle_batch_process_markdown_files({ 'file_assignments': [ {'path': f, 'memory_type': 'global'} for f in files ] }) assert result['success'] is True assert result['total_files'] == 5 assert result['processed_files'] == 5 assert result['failed_files'] == 0 # Verify progress information assert 'processing_time' in result assert 'average_time_per_file' in result assert len(result['results']) == 5 # Test 6: Error Recovery and Resilience async def test_error_recovery_in_batch(self): """Test error recovery in batch processing""" file1 = self.create_test_file("file1.md") file2 = self.create_test_file("file2.md") file3 = self.create_test_file("file3.md") # Mock error for middle file original_store = self.mock_memory_manager.store_memory def mock_store_with_error(*args, **kwargs): if 'file2.md' in str(args) or 'file2.md' in str(kwargs): raise Exception("Storage error") return original_store(*args, **kwargs) self.mock_memory_manager.store_memory.side_effect = mock_store_with_error result = await self.tool_handlers.handle_batch_process_markdown_files({ 'file_assignments': [ {'path': file1, 'memory_type': 'global'}, {'path': file2, 'memory_type': 'global'}, {'path': file3, 'memory_type': 'global'} ] }) assert result['success'] is True # Overall success despite one failure assert result['total_files'] == 3 assert result['processed_files'] == 2 assert result['failed_files'] == 1 # Check specific results results = result['results'] failed_result = next(r for r in results if not r['success']) assert 'file2.md' in failed_result['file'] assert 'storage error' in failed_result['error'].lower() # Test 7: Integration with Deduplication async def test_deduplication_integration(self): """Test integration with deduplication system""" file_path = self.create_test_file("test.md") # Mock deduplication results dedup_stats = { 'success': True, 'processed': 10, 'duplicates_removed': 2, 'near_misses': 3, 'collection': 'learned_memory' } self.mock_memory_manager.validate_and_deduplicate.return_value = dedup_stats result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'memory_type': 'learned' }) assert result['success'] is True assert result['deduplication_stats'] == dedup_stats # Verify deduplication was called with correct collection self.mock_memory_manager.validate_and_deduplicate.assert_called_once() call_args = self.mock_memory_manager.validate_and_deduplicate.call_args[1] assert call_args['collection'] == 'learned_memory' # Test 8: MCP Server Integration async def test_mcp_server_tool_registration(self): """Test MCP server has ingestion tools registered""" mock_handlers = Mock(spec=ToolHandlers) server = MCPServer(mock_handlers) # Get available tools tools = [tool['name'] for tool in server.get_available_tools()] # Verify all Step 3 tools are registered expected_tools = [ 'process_markdown_file', 'batch_process_markdown_files', 'batch_process_directory' ] for tool_name in expected_tools: assert tool_name in tools async def test_mcp_tool_schemas(self): """Test MCP tool schemas are properly defined""" mock_handlers = Mock(spec=ToolHandlers) server = MCPServer(mock_handlers) tools = {tool['name']: tool for tool in server.get_available_tools()} # Test process_markdown_file schema pmf_tool = tools['process_markdown_file'] assert pmf_tool['inputSchema']['type'] == 'object' assert 'path' in pmf_tool['inputSchema']['properties'] assert 'memory_type' in pmf_tool['inputSchema']['properties'] assert pmf_tool['inputSchema']['required'] == ['path'] # Test batch_process_markdown_files schema bpmf_tool = tools['batch_process_markdown_files'] assert 'file_assignments' in bpmf_tool['inputSchema']['properties'] assert bpmf_tool['inputSchema']['required'] == ['file_assignments'] # Test batch_process_directory schema bpd_tool = tools['batch_process_directory'] assert 'directory' in bpd_tool['inputSchema']['properties'] assert bpd_tool['inputSchema']['required'] == [] # Test 9: Memory Type Handling async def test_memory_type_validation(self): """Test memory type validation and agent_id handling""" file_path = self.create_test_file("test.md") # Test agent memory type requires agent_id result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'memory_type': 'agent', 'agent_id': 'test_agent' }) assert result['success'] is True assert result['memory_type'] == 'agent' assert result['agent_id'] == 'test_agent' async def test_invalid_memory_type_fallback(self): """Test handling of invalid memory types""" file_path = self.create_test_file("test.md") # Mock analyzer for fallback suggestion self.mock_markdown_processor.analyze_content.return_value = { 'memory_type': 'global', 'confidence': 0.7 } result = await self.tool_handlers.handle_process_markdown_file({ 'path': file_path, 'memory_type': 'invalid_type', # Invalid type 'auto_suggest': True }) assert result['success'] is True assert result['memory_type'] == 'global' # Fallback from analyzer if __name__ == '__main__': pytest.main([__file__, '-v'])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hannesnortje/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server