Skip to main content
Glama

basic-memory

test_sync_service.py66.4 kB
"""Test general sync behavior.""" import asyncio import os from datetime import datetime, timezone from pathlib import Path from textwrap import dedent import pytest from basic_memory.config import ProjectConfig, BasicMemoryConfig from basic_memory.models import Entity from basic_memory.repository import EntityRepository from basic_memory.schemas.search import SearchQuery from basic_memory.services import EntityService, FileService from basic_memory.services.search_service import SearchService from basic_memory.sync.sync_service import SyncService async def create_test_file(path: Path, content: str = "test content") -> None: """Create a test file with given content.""" path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content) async def touch_file(path: Path) -> None: """Touch a file to update its mtime (for watermark testing).""" import time # Read and rewrite to update mtime content = path.read_text() time.sleep(0.5) # Ensure mtime changes and is newer than watermark (500ms) path.write_text(content) async def force_full_scan(sync_service: SyncService) -> None: """Force next sync to do a full scan by clearing watermark (for testing moves/deletions).""" if sync_service.entity_repository.project_id is not None: project = await sync_service.project_repository.find_by_id( sync_service.entity_repository.project_id ) if project: await sync_service.project_repository.update( project.id, { "last_scan_timestamp": None, "last_file_count": None, }, ) @pytest.mark.asyncio async def test_forward_reference_resolution( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService, ): """Test that forward references get resolved when target file is created.""" project_dir = project_config.home # First create a file with a forward reference source_content = """ --- type: knowledge --- # Source Document ## Relations - depends_on [[target-doc]] - depends_on [[target-doc]] # duplicate """ await create_test_file(project_dir / "source.md", source_content) # Initial sync - should create forward reference await sync_service.sync(project_config.home) # Verify forward reference source = await entity_service.get_by_permalink("source") assert len(source.relations) == 1 assert source.relations[0].to_id is None assert source.relations[0].to_name == "target-doc" # Now create the target file target_content = """ --- type: knowledge --- # Target Doc Target content """ target_file = project_dir / "target_doc.md" await create_test_file(target_file, target_content) # Force full scan to ensure the new file is detected # Incremental scans have timing precision issues with watermarks on some filesystems await force_full_scan(sync_service) # Sync again - should resolve the reference await sync_service.sync(project_config.home) # Verify reference is now resolved source = await entity_service.get_by_permalink("source") target = await entity_service.get_by_permalink("target-doc") assert len(source.relations) == 1 assert source.relations[0].to_id == target.id assert source.relations[0].to_name == target.title @pytest.mark.asyncio async def test_sync( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService ): """Test basic knowledge sync functionality.""" # Create test files project_dir = project_config.home # New entity with relation new_content = """ --- type: knowledge permalink: concept/test-concept created: 2023-01-01 modified: 2023-01-01 --- # Test Concept A test concept. ## Observations - [design] Core feature ## Relations - depends_on [[concept/other]] """ await create_test_file(project_dir / "concept/test_concept.md", new_content) # Create related entity in DB that will be deleted # because file was not found other = Entity( permalink="concept/other", title="Other", entity_type="test", file_path="concept/other.md", checksum="12345678", content_type="text/markdown", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) await entity_service.repository.add(other) # Run sync await sync_service.sync(project_config.home) # Verify results entities = await entity_service.repository.find_all() assert len(entities) == 1 # Find new entity test_concept = next(e for e in entities if e.permalink == "concept/test-concept") assert test_concept.entity_type == "knowledge" # Verify relation was created # with forward link entity = await entity_service.get_by_permalink(test_concept.permalink) relations = entity.relations assert len(relations) == 1, "Expected 1 relation for entity" assert relations[0].to_name == "concept/other" @pytest.mark.asyncio async def test_sync_hidden_file( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService ): """Test basic knowledge sync functionality.""" # Create test files project_dir = project_config.home # hidden file await create_test_file(project_dir / "concept/.hidden.md", "hidden") # Run sync await sync_service.sync(project_config.home) # Verify results entities = await entity_service.repository.find_all() assert len(entities) == 0 @pytest.mark.asyncio async def test_sync_entity_with_nonexistent_relations( sync_service: SyncService, project_config: ProjectConfig ): """Test syncing an entity that references nonexistent entities.""" project_dir = project_config.home # Create entity that references entities we haven't created yet content = """ --- type: knowledge permalink: concept/depends-on-future created: 2024-01-01 modified: 2024-01-01 --- # Test Dependencies ## Observations - [design] Testing future dependencies ## Relations - depends_on [[concept/not_created_yet]] - uses [[concept/also_future]] """ await create_test_file(project_dir / "concept/depends_on_future.md", content) # Sync await sync_service.sync(project_config.home) # Verify entity created but no relations entity = await sync_service.entity_service.repository.get_by_permalink( "concept/depends-on-future" ) assert entity is not None assert len(entity.relations) == 2 assert entity.relations[0].to_name == "concept/not_created_yet" assert entity.relations[1].to_name == "concept/also_future" @pytest.mark.asyncio async def test_sync_entity_circular_relations( sync_service: SyncService, project_config: ProjectConfig ): """Test syncing entities with circular dependencies.""" project_dir = project_config.home # Create entity A that depends on B content_a = """ --- type: knowledge permalink: concept/entity-a created: 2024-01-01 modified: 2024-01-01 --- # Entity A ## Observations - First entity in circular reference ## Relations - depends_on [[concept/entity-b]] """ await create_test_file(project_dir / "concept/entity_a.md", content_a) # Create entity B that depends on A content_b = """ --- type: knowledge permalink: concept/entity-b created: 2024-01-01 modified: 2024-01-01 --- # Entity B ## Observations - Second entity in circular reference ## Relations - depends_on [[concept/entity-a]] """ await create_test_file(project_dir / "concept/entity_b.md", content_b) # Sync await sync_service.sync(project_config.home) # Verify both entities and their relations entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a") entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b") # outgoing relations assert len(entity_a.outgoing_relations) == 1 assert len(entity_b.outgoing_relations) == 1 # incoming relations assert len(entity_a.incoming_relations) == 1 assert len(entity_b.incoming_relations) == 1 # all relations assert len(entity_a.relations) == 2 assert len(entity_b.relations) == 2 # Verify circular reference works a_relation = entity_a.outgoing_relations[0] assert a_relation.to_id == entity_b.id b_relation = entity_b.outgoing_relations[0] assert b_relation.to_id == entity_a.id @pytest.mark.asyncio async def test_sync_entity_duplicate_relations( sync_service: SyncService, project_config: ProjectConfig ): """Test handling of duplicate relations in an entity.""" project_dir = project_config.home # Create target entity first target_content = """ --- type: knowledge permalink: concept/target created: 2024-01-01 modified: 2024-01-01 --- # Target Entity ## Observations - something to observe """ await create_test_file(project_dir / "concept/target.md", target_content) # Create entity with duplicate relations content = """ --- type: knowledge permalink: concept/duplicate-relations created: 2024-01-01 modified: 2024-01-01 --- # Test Duplicates ## Observations - this has a lot of relations ## Relations - depends_on [[concept/target]] - depends_on [[concept/target]] # Duplicate - uses [[concept/target]] # Different relation type - uses [[concept/target]] # Duplicate of different type """ await create_test_file(project_dir / "concept/duplicate_relations.md", content) # Sync await sync_service.sync(project_config.home) # Verify duplicates are handled entity = await sync_service.entity_service.repository.get_by_permalink( "concept/duplicate-relations" ) # Count relations by type relation_counts = {} for rel in entity.relations: relation_counts[rel.relation_type] = relation_counts.get(rel.relation_type, 0) + 1 # Should only have one of each type assert relation_counts["depends_on"] == 1 assert relation_counts["uses"] == 1 @pytest.mark.asyncio async def test_sync_entity_with_random_categories( sync_service: SyncService, project_config: ProjectConfig ): """Test handling of random observation categories.""" project_dir = project_config.home content = """ --- type: knowledge permalink: concept/invalid-category created: 2024-01-01 modified: 2024-01-01 --- # Test Categories ## Observations - [random category] This is fine - [ a space category] Should default to note - This one is not an observation, should be ignored - [design] This is valid """ await create_test_file(project_dir / "concept/invalid_category.md", content) # Sync await sync_service.sync(project_config.home) # Verify observations entity = await sync_service.entity_service.repository.get_by_permalink( "concept/invalid-category" ) assert len(entity.observations) == 3 categories = [obs.category for obs in entity.observations] # Invalid categories should be converted to default assert "random category" in categories # Valid categories preserved assert "a space category" in categories assert "design" in categories @pytest.mark.skip("sometimes fails") @pytest.mark.asyncio async def test_sync_entity_with_order_dependent_relations( sync_service: SyncService, project_config: ProjectConfig ): """Test that order of entity syncing doesn't affect relation creation.""" project_dir = project_config.home # Create several interrelated entities entities = { "a": """ --- type: knowledge permalink: concept/entity-a created: 2024-01-01 modified: 2024-01-01 --- # Entity A ## Observations - depends on b - depends on c ## Relations - depends_on [[concept/entity-b]] - depends_on [[concept/entity-c]] """, "b": """ --- type: knowledge permalink: concept/entity-b created: 2024-01-01 modified: 2024-01-01 --- # Entity B ## Observations - depends on c ## Relations - depends_on [[concept/entity-c]] """, "c": """ --- type: knowledge permalink: concept/entity-c created: 2024-01-01 modified: 2024-01-01 --- # Entity C ## Observations - depends on a ## Relations - depends_on [[concept/entity-a]] """, } # Create files in different orders and verify results are the same for name, content in entities.items(): await create_test_file(project_dir / f"concept/entity_{name}.md", content) # Sync await sync_service.sync(project_config.home) # Verify all relations are created correctly regardless of order entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a") entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b") entity_c = await sync_service.entity_service.repository.get_by_permalink("concept/entity-c") # Verify outgoing relations by checking actual targets a_outgoing_targets = {rel.to_id for rel in entity_a.outgoing_relations} assert entity_b.id in a_outgoing_targets, ( f"A should depend on B. A's targets: {a_outgoing_targets}, B's ID: {entity_b.id}" ) assert entity_c.id in a_outgoing_targets, ( f"A should depend on C. A's targets: {a_outgoing_targets}, C's ID: {entity_c.id}" ) assert len(entity_a.outgoing_relations) == 2, "A should have exactly 2 outgoing relations" b_outgoing_targets = {rel.to_id for rel in entity_b.outgoing_relations} assert entity_c.id in b_outgoing_targets, "B should depend on C" assert len(entity_b.outgoing_relations) == 1, "B should have exactly 1 outgoing relation" c_outgoing_targets = {rel.to_id for rel in entity_c.outgoing_relations} assert entity_a.id in c_outgoing_targets, "C should depend on A" assert len(entity_c.outgoing_relations) == 1, "C should have exactly 1 outgoing relation" # Verify incoming relations by checking actual sources a_incoming_sources = {rel.from_id for rel in entity_a.incoming_relations} assert entity_c.id in a_incoming_sources, "A should have incoming relation from C" b_incoming_sources = {rel.from_id for rel in entity_b.incoming_relations} assert entity_a.id in b_incoming_sources, "B should have incoming relation from A" c_incoming_sources = {rel.from_id for rel in entity_c.incoming_relations} assert entity_a.id in c_incoming_sources, "C should have incoming relation from A" assert entity_b.id in c_incoming_sources, "C should have incoming relation from B" @pytest.mark.asyncio async def test_sync_empty_directories(sync_service: SyncService, project_config: ProjectConfig): """Test syncing empty directories.""" await sync_service.sync(project_config.home) # Should not raise exceptions for empty dirs assert project_config.home.exists() @pytest.mark.skip("flaky on Windows due to filesystem timing precision") @pytest.mark.asyncio async def test_sync_file_modified_during_sync( sync_service: SyncService, project_config: ProjectConfig ): """Test handling of files that change during sync process.""" # Create initial files doc_path = project_config.home / "changing.md" await create_test_file( doc_path, """ --- type: knowledge id: changing created: 2024-01-01 modified: 2024-01-01 --- # Knowledge File ## Observations - This is a test """, ) # Setup async modification during sync async def modify_file(): await asyncio.sleep(0.1) # Small delay to ensure sync has started doc_path.write_text("Modified during sync") # Run sync and modification concurrently await asyncio.gather(sync_service.sync(project_config.home), modify_file()) # Verify final state doc = await sync_service.entity_service.repository.get_by_permalink("changing") assert doc is not None # if we failed in the middle of a sync, the next one should fix it. if doc.checksum is None: await sync_service.sync(project_config.home) doc = await sync_service.entity_service.repository.get_by_permalink("changing") assert doc.checksum is not None @pytest.mark.asyncio async def test_permalink_formatting( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService ): """Test that permalinks are properly formatted during sync.""" # Test cases with different filename formats test_files = { # filename -> expected permalink "my_awesome_feature.md": "my-awesome-feature", "MIXED_CASE_NAME.md": "mixed-case-name", "spaces and_underscores.md": "spaces-and-underscores", "design/model_refactor.md": "design/model-refactor", "test/multiple_word_directory/feature_name.md": "test/multiple-word-directory/feature-name", } # Create test files content: str = """ --- type: knowledge created: 2024-01-01 modified: 2024-01-01 --- # Test File Testing permalink generation. """ for filename, _ in test_files.items(): await create_test_file(project_config.home / filename, content) # Run sync once after all files are created await sync_service.sync(project_config.home) # Verify permalinks entities = await entity_service.repository.find_all() for filename, expected_permalink in test_files.items(): # Find entity for this file entity = next(e for e in entities if e.file_path == filename) assert entity.permalink == expected_permalink, ( f"File {filename} should have permalink {expected_permalink}" ) @pytest.mark.asyncio async def test_handle_entity_deletion( test_graph, sync_service: SyncService, entity_repository: EntityRepository, search_service: SearchService, ): """Test deletion of entity cleans up search index.""" root_entity = test_graph["root"] # Delete the entity await sync_service.handle_delete(root_entity.file_path) # Verify entity is gone from db assert await entity_repository.get_by_permalink(root_entity.permalink) is None # Verify entity is gone from search index entity_results = await search_service.search(SearchQuery(text=root_entity.title)) assert len(entity_results) == 0 obs_results = await search_service.search(SearchQuery(text="Root note 1")) assert len(obs_results) == 0 rel_results = await search_service.search(SearchQuery(text="connects_to")) assert len(rel_results) == 0 @pytest.mark.asyncio async def test_sync_preserves_timestamps( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService, ): """Test that sync preserves file timestamps and frontmatter dates.""" project_dir = project_config.home # Create a file with explicit frontmatter dates frontmatter_content = """ --- type: knowledge --- # Explicit Dates Testing frontmatter dates """ await create_test_file(project_dir / "explicit_dates.md", frontmatter_content) # Create a file without dates (will use file timestamps) file_dates_content = """ --- type: knowledge --- # File Dates Testing file timestamps """ file_path = project_dir / "file_dates3.md" await create_test_file(file_path, file_dates_content) # Run sync await sync_service.sync(project_config.home) # Check explicit frontmatter dates explicit_entity = await entity_service.get_by_permalink("explicit-dates") assert explicit_entity.created_at is not None assert explicit_entity.updated_at is not None # Check file timestamps file_entity = await entity_service.get_by_permalink("file-dates3") file_stats = file_path.stat() # Compare using epoch timestamps to handle timezone differences correctly # This ensures we're comparing the actual points in time, not display representations entity_created_epoch = file_entity.created_at.timestamp() entity_updated_epoch = file_entity.updated_at.timestamp() # Allow 2s difference on Windows due to filesystem timing precision tolerance = 2 if os.name == "nt" else 1 assert abs(entity_created_epoch - file_stats.st_ctime) < tolerance assert abs(entity_updated_epoch - file_stats.st_mtime) < tolerance # Allow tolerance difference @pytest.mark.asyncio async def test_sync_updates_timestamps_on_file_modification( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService, ): """Test that sync updates entity timestamps when files are modified. This test specifically validates that when an existing file is modified and re-synced, the updated_at timestamp in the database reflects the file's actual modification time, not the database operation time. This is critical for accurate temporal ordering in search and recent_activity queries. """ project_dir = project_config.home # Create initial file initial_content = """ --- type: knowledge --- # Test File Initial content for timestamp test """ file_path = project_dir / "timestamp_test.md" await create_test_file(file_path, initial_content) # Initial sync await sync_service.sync(project_config.home) # Get initial entity and timestamps entity_before = await entity_service.get_by_permalink("timestamp-test") initial_updated_at = entity_before.updated_at # Modify the file content and update mtime to be newer than watermark modified_content = """ --- type: knowledge --- # Test File Modified content for timestamp test ## Observations - [test] This was modified """ file_path.write_text(modified_content) # Touch file to ensure mtime is newer than watermark # This uses our helper which sleeps 500ms and rewrites to guarantee mtime change await touch_file(file_path) # Get the file's modification time after our changes file_stats_after_modification = file_path.stat() # Force full scan to ensure the modified file is detected # (incremental scans have timing precision issues with watermarks on some filesystems) await force_full_scan(sync_service) # Re-sync the modified file await sync_service.sync(project_config.home) # Get entity after re-sync entity_after = await entity_service.get_by_permalink("timestamp-test") # Verify that updated_at changed assert entity_after.updated_at != initial_updated_at, ( "updated_at should change when file is modified" ) # Verify that updated_at matches the file's modification time, not db operation time entity_updated_epoch = entity_after.updated_at.timestamp() file_mtime = file_stats_after_modification.st_mtime # Allow 2s difference on Windows due to filesystem timing precision tolerance = 2 if os.name == "nt" else 1 assert abs(entity_updated_epoch - file_mtime) < tolerance, ( f"Entity updated_at ({entity_after.updated_at}) should match file mtime " f"({datetime.fromtimestamp(file_mtime)}) within {tolerance}s tolerance" ) # Verify the content was actually updated assert len(entity_after.observations) == 1 assert entity_after.observations[0].content == "This was modified" @pytest.mark.asyncio async def test_file_move_updates_search_index( sync_service: SyncService, project_config: ProjectConfig, search_service: SearchService, ): """Test that moving a file updates its path in the search index.""" project_dir = project_config.home # Create initial file content = """ --- type: knowledge --- # Test Move Content for move test """ old_path = project_dir / "old" / "test_move.md" old_path.parent.mkdir(parents=True) await create_test_file(old_path, content) # Initial sync await sync_service.sync(project_config.home) # Move the file new_path = project_dir / "new" / "moved_file.md" new_path.parent.mkdir(parents=True) old_path.rename(new_path) # Force full scan to detect the move # (rename doesn't update mtime, so incremental scan won't find it) await force_full_scan(sync_service) # Second sync should detect the move await sync_service.sync(project_config.home) # Check search index has updated path results = await search_service.search(SearchQuery(text="Content for move test")) assert len(results) == 1 assert results[0].file_path == new_path.relative_to(project_dir).as_posix() @pytest.mark.asyncio async def test_sync_null_checksum_cleanup( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService, ): """Test handling of entities with null checksums from incomplete syncs.""" # Create entity with null checksum (simulating incomplete sync) entity = Entity( permalink="concept/incomplete", title="Incomplete", entity_type="test", file_path="concept/incomplete.md", checksum=None, # Null checksum content_type="text/markdown", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) await entity_service.repository.add(entity) # Create corresponding file content = """ --- type: knowledge id: concept/incomplete created: 2024-01-01 modified: 2024-01-01 --- # Incomplete Entity ## Observations - Testing cleanup """ await create_test_file(project_config.home / "concept/incomplete.md", content) # Run sync await sync_service.sync(project_config.home) # Verify entity was properly synced updated = await entity_service.get_by_permalink("concept/incomplete") assert updated.checksum is not None @pytest.mark.asyncio async def test_sync_permalink_resolved( sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, app_config ): """Test that we resolve duplicate permalinks on sync .""" project_dir = project_config.home # Create initial file content = """ --- type: knowledge --- # Test Move Content for move test """ old_path = project_dir / "old" / "test_move.md" old_path.parent.mkdir(parents=True) await create_test_file(old_path, content) # Initial sync await sync_service.sync(project_config.home) # Move the file new_path = project_dir / "new" / "moved_file.md" new_path.parent.mkdir(parents=True) old_path.rename(new_path) # Force full scan to detect the move # (rename doesn't update mtime, so incremental scan won't find it) await force_full_scan(sync_service) # Sync again await sync_service.sync(project_config.home) file_content, _ = await file_service.read_file(new_path) assert "permalink: new/moved-file" in file_content # Create another that has the same permalink content = """ --- type: knowledge permalink: new/moved-file --- # Test Move Content for move test """ old_path = project_dir / "old" / "test_move.md" old_path.parent.mkdir(parents=True, exist_ok=True) await create_test_file(old_path, content) # Force full scan to detect the new file # (file just created may not be newer than watermark due to timing precision) await force_full_scan(sync_service) # Sync new file await sync_service.sync(project_config.home) # assert permalink is unique file_content, _ = await file_service.read_file(old_path) assert "permalink: new/moved-file-1" in file_content @pytest.mark.asyncio async def test_sync_permalink_resolved_on_update( sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, ): """Test that sync resolves permalink conflicts on update.""" project_dir = project_config.home one_file = project_dir / "one.md" two_file = project_dir / "two.md" await create_test_file( one_file, content=dedent( """ --- permalink: one --- test content """ ), ) await create_test_file( two_file, content=dedent( """ --- permalink: two --- test content """ ), ) # Run sync await sync_service.sync(project_config.home) # Check permalinks file_one_content, _ = await file_service.read_file(one_file) assert "permalink: one" in file_one_content file_two_content, _ = await file_service.read_file(two_file) assert "permalink: two" in file_two_content # update the second file with a duplicate permalink updated_content = """ --- title: two.md type: note permalink: one tags: [] --- test content """ two_file.write_text(updated_content) # Force full scan to detect the modified file # (file just modified may not be newer than watermark due to timing precision) await force_full_scan(sync_service) # Run sync await sync_service.sync(project_config.home) # Check permalinks file_two_content, _ = await file_service.read_file(two_file) assert "permalink: two" in file_two_content # new content with duplicate permalink new_content = """ --- title: new.md type: note permalink: one tags: [] --- test content """ new_file = project_dir / "new.md" await create_test_file(new_file, new_content) # Force full scan to detect the new file # (file just created may not be newer than watermark due to timing precision) await force_full_scan(sync_service) # Run another time await sync_service.sync(project_config.home) # Should have deduplicated permalink new_file_content, _ = await file_service.read_file(new_file) assert "permalink: one-1" in new_file_content @pytest.mark.asyncio async def test_sync_permalink_not_created_if_no_frontmatter( sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, ): """Test that sync resolves permalink conflicts on update.""" project_dir = project_config.home file = project_dir / "one.md" await create_test_file(file) # Run sync await sync_service.sync(project_config.home) # Check permalink not created file_content, _ = await file_service.read_file(file) assert "permalink:" not in file_content @pytest.fixture def test_config_update_permamlinks_on_move(app_config) -> BasicMemoryConfig: """Test configuration using in-memory DB.""" app_config.update_permalinks_on_move = True return app_config @pytest.mark.asyncio async def test_sync_permalink_updated_on_move( test_config_update_permamlinks_on_move: BasicMemoryConfig, project_config: ProjectConfig, sync_service: SyncService, file_service: FileService, ): """Test that we update a permalink on a file move if set in config .""" project_dir = project_config.home # Create initial file content = dedent( """ --- type: knowledge --- # Test Move Content for move test """ ) old_path = project_dir / "old" / "test_move.md" old_path.parent.mkdir(parents=True) await create_test_file(old_path, content) # Initial sync await sync_service.sync(project_config.home) # verify permalink old_content, _ = await file_service.read_file(old_path) assert "permalink: old/test-move" in old_content # Move the file new_path = project_dir / "new" / "moved_file.md" new_path.parent.mkdir(parents=True) old_path.rename(new_path) # Force full scan to detect the move # (rename doesn't update mtime, so incremental scan won't find it) await force_full_scan(sync_service) # Sync again await sync_service.sync(project_config.home) file_content, _ = await file_service.read_file(new_path) assert "permalink: new/moved-file" in file_content @pytest.mark.asyncio async def test_sync_non_markdown_files(sync_service, project_config, test_files): """Test syncing non-markdown files.""" report = await sync_service.sync(project_config.home) assert report.total == 2 # Check files were detected assert test_files["pdf"].name in [f for f in report.new] assert test_files["image"].name in [f for f in report.new] # Verify entities were created pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name)) assert pdf_entity is not None, "PDF entity should have been created" assert pdf_entity.content_type == "application/pdf" image_entity = await sync_service.entity_repository.get_by_file_path( str(test_files["image"].name) ) assert image_entity.content_type == "image/png" @pytest.mark.asyncio async def test_sync_non_markdown_files_modified( sync_service, project_config, test_files, file_service ): """Test syncing non-markdown files.""" report = await sync_service.sync(project_config.home) assert report.total == 2 # Check files were detected assert test_files["pdf"].name in [f for f in report.new] assert test_files["image"].name in [f for f in report.new] test_files["pdf"].write_text("New content") test_files["image"].write_text("New content") # Force full scan to detect the modified files # (files just modified may not be newer than watermark due to timing precision) await force_full_scan(sync_service) report = await sync_service.sync(project_config.home) assert len(report.modified) == 2 pdf_file_content, pdf_checksum = await file_service.read_file(test_files["pdf"].name) image_file_content, img_checksum = await file_service.read_file(test_files["image"].name) pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name)) image_entity = await sync_service.entity_repository.get_by_file_path( str(test_files["image"].name) ) assert pdf_entity.checksum == pdf_checksum assert image_entity.checksum == img_checksum @pytest.mark.asyncio async def test_sync_non_markdown_files_move(sync_service, project_config, test_files): """Test syncing non-markdown files updates permalink""" report = await sync_service.sync(project_config.home) assert report.total == 2 # Check files were detected assert test_files["pdf"].name in [f for f in report.new] assert test_files["image"].name in [f for f in report.new] test_files["pdf"].rename(project_config.home / "moved_pdf.pdf") # Force full scan to detect the move # (rename doesn't update mtime, so incremental scan won't find it) await force_full_scan(sync_service) report2 = await sync_service.sync(project_config.home) assert len(report2.moves) == 1 # Verify entity is updated pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf") assert pdf_entity is not None assert pdf_entity.permalink is None @pytest.mark.asyncio async def test_sync_non_markdown_files_deleted(sync_service, project_config, test_files): """Test syncing non-markdown files updates permalink""" report = await sync_service.sync(project_config.home) assert report.total == 2 # Check files were detected assert test_files["pdf"].name in [f for f in report.new] assert test_files["image"].name in [f for f in report.new] test_files["pdf"].unlink() report2 = await sync_service.sync(project_config.home) assert len(report2.deleted) == 1 # Verify entity is deleted pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf") assert pdf_entity is None @pytest.mark.asyncio async def test_sync_non_markdown_files_move_with_delete( sync_service, project_config, test_files, file_service ): """Test syncing non-markdown files handles file deletes and renames during sync""" # Create initial files await create_test_file(project_config.home / "doc.pdf", "content1") await create_test_file(project_config.home / "other/doc-1.pdf", "content2") # Initial sync await sync_service.sync(project_config.home) # First move/delete the original file to make way for the move (project_config.home / "doc.pdf").unlink() (project_config.home / "other/doc-1.pdf").rename(project_config.home / "doc.pdf") # Sync again await sync_service.sync(project_config.home) # Verify the changes moved_entity = await sync_service.entity_repository.get_by_file_path("doc.pdf") assert moved_entity is not None assert moved_entity.permalink is None file_content, _ = await file_service.read_file("doc.pdf") assert "content2" in file_content @pytest.mark.asyncio async def test_sync_relation_to_non_markdown_file( sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, test_files ): """Test that sync resolves permalink conflicts on update.""" project_dir = project_config.home content = f""" --- title: a note type: note tags: [] --- - relates_to [[{test_files["pdf"].name}]] """ note_file = project_dir / "note.md" await create_test_file(note_file, content) # Run sync await sync_service.sync(project_config.home) # Check permalinks file_one_content, _ = await file_service.read_file(note_file) assert ( f"""--- title: a note type: note tags: [] permalink: note --- - relates_to [[{test_files["pdf"].name}]] """.strip() == file_one_content ) @pytest.mark.asyncio async def test_sync_regular_file_race_condition_handling( sync_service: SyncService, project_config: ProjectConfig ): """Test that sync_regular_file handles race condition with IntegrityError (lines 380-401).""" from unittest.mock import patch from sqlalchemy.exc import IntegrityError from datetime import datetime, timezone # Create a test file test_file = project_config.home / "test_race.md" test_content = """ --- type: knowledge --- # Test Race Condition This is a test file for race condition handling. """ await create_test_file(test_file, test_content) # Mock the entity_repository.add to raise IntegrityError on first call original_add = sync_service.entity_repository.add call_count = 0 async def mock_add(*args, **kwargs): nonlocal call_count call_count += 1 if call_count == 1: # Simulate race condition - another process created the entity raise IntegrityError("UNIQUE constraint failed: entity.file_path", None, None) # pyright: ignore [reportArgumentType] else: return await original_add(*args, **kwargs) # Mock get_by_file_path to return an existing entity (simulating the race condition result) async def mock_get_by_file_path(file_path): from basic_memory.models import Entity return Entity( id=1, title="Test Race Condition", entity_type="knowledge", file_path=str(file_path), permalink="test-race-condition", content_type="text/markdown", checksum="old_checksum", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) # Mock update to return the updated entity async def mock_update(entity_id, updates): from basic_memory.models import Entity return Entity( id=entity_id, title="Test Race Condition", entity_type="knowledge", file_path=updates["file_path"], permalink="test-race-condition", content_type="text/markdown", checksum=updates["checksum"], created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) with ( patch.object(sync_service.entity_repository, "add", side_effect=mock_add), patch.object( sync_service.entity_repository, "get_by_file_path", side_effect=mock_get_by_file_path ) as mock_get, patch.object( sync_service.entity_repository, "update", side_effect=mock_update ) as mock_update_call, ): # Call sync_regular_file entity, checksum = await sync_service.sync_regular_file( str(test_file.relative_to(project_config.home)), new=True ) # Verify it handled the race condition gracefully assert entity is not None assert entity.title == "Test Race Condition" assert entity.file_path == str(test_file.relative_to(project_config.home)) # Verify that get_by_file_path and update were called as fallback assert mock_get.call_count >= 1 # May be called multiple times mock_update_call.assert_called_once() @pytest.mark.asyncio async def test_sync_regular_file_integrity_error_reraise( sync_service: SyncService, project_config: ProjectConfig ): """Test that sync_regular_file re-raises IntegrityError for non-race-condition cases.""" from unittest.mock import patch from sqlalchemy.exc import IntegrityError # Create a test file test_file = project_config.home / "test_integrity.md" test_content = """ --- type: knowledge --- # Test Integrity Error This is a test file for integrity error handling. """ await create_test_file(test_file, test_content) # Mock the entity_repository.add to raise a different IntegrityError (not file_path constraint) async def mock_add(*args, **kwargs): # Simulate a different constraint violation raise IntegrityError("UNIQUE constraint failed: entity.some_other_field", None, None) # pyright: ignore [reportArgumentType] with patch.object(sync_service.entity_repository, "add", side_effect=mock_add): # Should re-raise the IntegrityError since it's not a file_path constraint with pytest.raises( IntegrityError, match="UNIQUE constraint failed: entity.some_other_field" ): await sync_service.sync_regular_file( str(test_file.relative_to(project_config.home)), new=True ) @pytest.mark.asyncio async def test_sync_regular_file_race_condition_entity_not_found( sync_service: SyncService, project_config: ProjectConfig ): """Test handling when entity is not found after IntegrityError (pragma: no cover case).""" from unittest.mock import patch from sqlalchemy.exc import IntegrityError # Create a test file test_file = project_config.home / "test_not_found.md" test_content = """ --- type: knowledge --- # Test Not Found This is a test file for entity not found after constraint violation. """ await create_test_file(test_file, test_content) # Mock the entity_repository.add to raise IntegrityError async def mock_add(*args, **kwargs): raise IntegrityError("UNIQUE constraint failed: entity.file_path", None, None) # pyright: ignore [reportArgumentType] # Mock get_by_file_path to return None (entity not found) async def mock_get_by_file_path(file_path): return None with ( patch.object(sync_service.entity_repository, "add", side_effect=mock_add), patch.object( sync_service.entity_repository, "get_by_file_path", side_effect=mock_get_by_file_path ), ): # Should raise ValueError when entity is not found after constraint violation with pytest.raises(ValueError, match="Entity not found after constraint violation"): await sync_service.sync_regular_file( str(test_file.relative_to(project_config.home)), new=True ) @pytest.mark.asyncio async def test_sync_regular_file_race_condition_update_failed( sync_service: SyncService, project_config: ProjectConfig ): """Test handling when update fails after IntegrityError (pragma: no cover case).""" from unittest.mock import patch from sqlalchemy.exc import IntegrityError from datetime import datetime, timezone # Create a test file test_file = project_config.home / "test_update_fail.md" test_content = """ --- type: knowledge --- # Test Update Fail This is a test file for update failure after constraint violation. """ await create_test_file(test_file, test_content) # Mock the entity_repository.add to raise IntegrityError async def mock_add(*args, **kwargs): raise IntegrityError("UNIQUE constraint failed: entity.file_path", None, None) # pyright: ignore [reportArgumentType] # Mock get_by_file_path to return an existing entity async def mock_get_by_file_path(file_path): from basic_memory.models import Entity return Entity( id=1, title="Test Update Fail", entity_type="knowledge", file_path=str(file_path), permalink="test-update-fail", content_type="text/markdown", checksum="old_checksum", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) # Mock update to return None (failure) async def mock_update(entity_id, updates): return None with ( patch.object(sync_service.entity_repository, "add", side_effect=mock_add), patch.object( sync_service.entity_repository, "get_by_file_path", side_effect=mock_get_by_file_path ), patch.object(sync_service.entity_repository, "update", side_effect=mock_update), ): # Should raise ValueError when update fails with pytest.raises(ValueError, match="Failed to update entity with ID"): await sync_service.sync_regular_file( str(test_file.relative_to(project_config.home)), new=True ) @pytest.mark.asyncio async def test_circuit_breaker_skips_after_three_failures( sync_service: SyncService, project_config: ProjectConfig ): """Test that circuit breaker skips file after 3 consecutive failures.""" from unittest.mock import patch project_dir = project_config.home test_file = project_dir / "failing_file.md" # Create a file with malformed content that will fail to parse await create_test_file(test_file, "invalid markdown content") # Mock sync_markdown_file to always fail async def mock_sync_markdown_file(*args, **kwargs): raise ValueError("Simulated sync failure") with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file): # First sync - should fail and record (1/3) report1 = await sync_service.sync(project_dir) assert len(report1.skipped_files) == 0 # Not skipped yet # Touch file to trigger incremental scan await touch_file(test_file) # Force full scan to ensure file is detected # (touch may not update mtime sufficiently on all filesystems) await force_full_scan(sync_service) # Second sync - should fail and record (2/3) report2 = await sync_service.sync(project_dir) assert len(report2.skipped_files) == 0 # Still not skipped # Touch file to trigger incremental scan await touch_file(test_file) # Force full scan to ensure file is detected # (touch may not update mtime sufficiently on all filesystems) await force_full_scan(sync_service) # Third sync - should fail, record (3/3), and be added to skipped list report3 = await sync_service.sync(project_dir) assert len(report3.skipped_files) == 1 assert report3.skipped_files[0].path == "failing_file.md" assert report3.skipped_files[0].failure_count == 3 assert "Simulated sync failure" in report3.skipped_files[0].reason # Touch file to trigger incremental scan await touch_file(test_file) # Force full scan to ensure file is detected # (touch may not update mtime sufficiently on all filesystems) await force_full_scan(sync_service) # Fourth sync - should be skipped immediately without attempting report4 = await sync_service.sync(project_dir) assert len(report4.skipped_files) == 1 # Still skipped @pytest.mark.asyncio async def test_circuit_breaker_resets_on_file_change( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService ): """Test that circuit breaker resets when file content changes.""" from unittest.mock import patch project_dir = project_config.home test_file = project_dir / "changing_file.md" # Create initial failing content await create_test_file(test_file, "initial bad content") # Mock sync_markdown_file to fail call_count = 0 async def mock_sync_markdown_file(*args, **kwargs): nonlocal call_count call_count += 1 raise ValueError("Simulated sync failure") with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file): # Fail 3 times to hit circuit breaker threshold await sync_service.sync(project_dir) # Fail 1 await touch_file(test_file) # Touch to trigger incremental scan # Force full scan to ensure file is detected # (touch may not update mtime sufficiently on all filesystems) await force_full_scan(sync_service) await sync_service.sync(project_dir) # Fail 2 await touch_file(test_file) # Touch to trigger incremental scan # Force full scan to ensure file is detected # (touch may not update mtime sufficiently on all filesystems) await force_full_scan(sync_service) report3 = await sync_service.sync(project_dir) # Fail 3 - now skipped assert len(report3.skipped_files) == 1 # Now change the file content valid_content = dedent( """ --- title: Fixed Content type: knowledge --- # Fixed Content This should work now. """ ).strip() await create_test_file(test_file, valid_content) # Force full scan to detect the modified file # (file just modified may not be newer than watermark due to timing precision) await force_full_scan(sync_service) # Circuit breaker should reset and allow retry report = await sync_service.sync(project_dir) assert len(report.skipped_files) == 0 # Should not be skipped anymore # Verify entity was created successfully entity = await entity_service.get_by_permalink("changing-file") assert entity is not None assert entity.title == "Fixed Content" @pytest.mark.asyncio async def test_circuit_breaker_clears_on_success( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService ): """Test that circuit breaker clears failure history after successful sync.""" from unittest.mock import patch project_dir = project_config.home test_file = project_dir / "sometimes_failing.md" valid_content = dedent( """ --- title: Test File type: knowledge --- # Test File Test content """ ).strip() await create_test_file(test_file, valid_content) # Mock to fail twice, then succeed call_count = 0 original_sync_markdown_file = sync_service.sync_markdown_file async def mock_sync_markdown_file(path, new): nonlocal call_count call_count += 1 if call_count <= 2: raise ValueError("Temporary failure") # On third call, use the real implementation return await original_sync_markdown_file(path, new) # Patch and fail twice with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file): await sync_service.sync(project_dir) # Fail 1 await touch_file(test_file) # Touch to trigger incremental scan # Force full scan to ensure file is detected # (touch may not update mtime sufficiently on all filesystems) await force_full_scan(sync_service) await sync_service.sync(project_dir) # Fail 2 await touch_file(test_file) # Touch to trigger incremental scan # Force full scan to ensure file is detected # (touch may not update mtime sufficiently on all filesystems) await force_full_scan(sync_service) await sync_service.sync(project_dir) # Succeed # Verify failure history was cleared assert "sometimes_failing.md" not in sync_service._file_failures # Verify entity was created entity = await entity_service.get_by_permalink("sometimes-failing") assert entity is not None @pytest.mark.asyncio async def test_circuit_breaker_tracks_multiple_files( sync_service: SyncService, project_config: ProjectConfig ): """Test that circuit breaker tracks multiple failing files independently.""" from unittest.mock import patch project_dir = project_config.home # Create multiple files with valid markdown await create_test_file( project_dir / "file1.md", """ --- type: knowledge --- # File 1 Content 1 """, ) await create_test_file( project_dir / "file2.md", """ --- type: knowledge --- # File 2 Content 2 """, ) await create_test_file( project_dir / "file3.md", """ --- type: knowledge --- # File 3 Content 3 """, ) # Mock to make file1 and file2 fail, but file3 succeed original_sync_markdown_file = sync_service.sync_markdown_file async def mock_sync_markdown_file(path, new): if "file1.md" in path or "file2.md" in path: raise ValueError(f"Failure for {path}") # file3 succeeds - use real implementation return await original_sync_markdown_file(path, new) with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file): # Fail 3 times for file1 and file2 (file3 succeeds each time) await sync_service.sync(project_dir) # Fail count: file1=1, file2=1 await touch_file(project_dir / "file1.md") # Touch to trigger incremental scan await touch_file(project_dir / "file2.md") # Touch to trigger incremental scan await sync_service.sync(project_dir) # Fail count: file1=2, file2=2 await touch_file(project_dir / "file1.md") # Touch to trigger incremental scan await touch_file(project_dir / "file2.md") # Touch to trigger incremental scan report3 = await sync_service.sync(project_dir) # Fail count: file1=3, file2=3, now skipped # Both files should be skipped on third sync assert len(report3.skipped_files) == 2 skipped_paths = {f.path for f in report3.skipped_files} assert "file1.md" in skipped_paths assert "file2.md" in skipped_paths # Verify file3 is not in failures dict assert "file3.md" not in sync_service._file_failures @pytest.mark.asyncio async def test_circuit_breaker_handles_checksum_computation_failure( sync_service: SyncService, project_config: ProjectConfig ): """Test circuit breaker behavior when checksum computation fails.""" from unittest.mock import patch project_dir = project_config.home test_file = project_dir / "checksum_fail.md" await create_test_file(test_file, "content") # Mock sync_markdown_file to fail async def mock_sync_markdown_file(*args, **kwargs): raise ValueError("Sync failure") # Mock checksum computation to fail only during _record_failure (not during scan) original_compute_checksum = sync_service.file_service.compute_checksum call_count = 0 async def mock_compute_checksum(path): nonlocal call_count call_count += 1 # First call is during scan - let it succeed if call_count == 1: return await original_compute_checksum(path) # Second call is during _record_failure - make it fail raise IOError("Cannot read file") with ( patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file), patch.object( sync_service.file_service, "compute_checksum", side_effect=mock_compute_checksum, ), ): # Should still record failure even if checksum fails await sync_service.sync(project_dir) # Check that failure was recorded with empty checksum assert "checksum_fail.md" in sync_service._file_failures failure_info = sync_service._file_failures["checksum_fail.md"] assert failure_info.count == 1 assert failure_info.last_checksum == "" # Empty when checksum fails @pytest.mark.asyncio async def test_sync_fatal_error_terminates_sync_immediately( sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService ): """Test that SyncFatalError terminates sync immediately without circuit breaker retry. This tests the fix for issue #188 where project deletion during sync should terminate immediately rather than retrying each file 3 times. """ from unittest.mock import patch from basic_memory.services.exceptions import SyncFatalError project_dir = project_config.home # Create multiple test files await create_test_file( project_dir / "file1.md", dedent( """ --- type: knowledge --- # File 1 Content 1 """ ), ) await create_test_file( project_dir / "file2.md", dedent( """ --- type: knowledge --- # File 2 Content 2 """ ), ) await create_test_file( project_dir / "file3.md", dedent( """ --- type: knowledge --- # File 3 Content 3 """ ), ) # Mock entity_service.create_entity_from_markdown to raise SyncFatalError on first file # This simulates project being deleted during sync async def mock_create_entity_from_markdown(*args, **kwargs): raise SyncFatalError( "Cannot sync file 'file1.md': project_id=99999 does not exist in database. " "The project may have been deleted. This sync will be terminated." ) with patch.object( entity_service, "create_entity_from_markdown", side_effect=mock_create_entity_from_markdown ): # Sync should raise SyncFatalError and terminate immediately with pytest.raises(SyncFatalError, match="project_id=99999 does not exist"): await sync_service.sync(project_dir) # Verify that circuit breaker did NOT record this as a file-level failure # (SyncFatalError should bypass circuit breaker and re-raise immediately) assert "file1.md" not in sync_service._file_failures # Verify that no other files were attempted (sync terminated on first error) # If circuit breaker was used, we'd see file1 in failures # If sync continued, we'd see attempts for file2 and file3 @pytest.mark.asyncio async def test_scan_directory_basic(sync_service: SyncService, project_config: ProjectConfig): """Test basic streaming directory scan functionality.""" project_dir = project_config.home # Create test files in different directories await create_test_file(project_dir / "root.md", "root content") await create_test_file(project_dir / "subdir/file1.md", "file 1 content") await create_test_file(project_dir / "subdir/file2.md", "file 2 content") await create_test_file(project_dir / "subdir/nested/file3.md", "file 3 content") # Collect results from streaming iterator results = [] async for file_path, stat_info in sync_service.scan_directory(project_dir): rel_path = Path(file_path).relative_to(project_dir).as_posix() results.append((rel_path, stat_info)) # Verify all files were found file_paths = {rel_path for rel_path, _ in results} assert "root.md" in file_paths assert "subdir/file1.md" in file_paths assert "subdir/file2.md" in file_paths assert "subdir/nested/file3.md" in file_paths assert len(file_paths) == 4 # Verify stat info is present for each file for rel_path, stat_info in results: assert stat_info is not None assert stat_info.st_size > 0 # Files have content assert stat_info.st_mtime > 0 # Have modification time @pytest.mark.asyncio async def test_scan_directory_respects_ignore_patterns( sync_service: SyncService, project_config: ProjectConfig ): """Test that streaming scan respects .gitignore patterns.""" project_dir = project_config.home # Create .gitignore file in project (will be used along with .bmignore) (project_dir / ".gitignore").write_text("*.ignored\n.hidden/\n") # Reload ignore patterns using project's .gitignore from basic_memory.ignore_utils import load_gitignore_patterns sync_service._ignore_patterns = load_gitignore_patterns(project_dir) # Create test files - some should be ignored await create_test_file(project_dir / "included.md", "included") await create_test_file(project_dir / "excluded.ignored", "excluded") await create_test_file(project_dir / ".hidden/secret.md", "secret") await create_test_file(project_dir / "subdir/file.md", "file") # Collect results results = [] async for file_path, stat_info in sync_service.scan_directory(project_dir): rel_path = Path(file_path).relative_to(project_dir).as_posix() results.append(rel_path) # Verify ignored files were not returned assert "included.md" in results assert "subdir/file.md" in results assert "excluded.ignored" not in results assert ".hidden/secret.md" not in results assert ".bmignore" not in results # .bmignore itself should be ignored @pytest.mark.asyncio async def test_scan_directory_cached_stat_info( sync_service: SyncService, project_config: ProjectConfig ): """Test that streaming scan provides cached stat info (no redundant stat calls).""" project_dir = project_config.home # Create test file test_file = project_dir / "test.md" await create_test_file(test_file, "test content") # Get stat info from streaming scan async for file_path, stat_info in sync_service.scan_directory(project_dir): if Path(file_path).name == "test.md": # Get independent stat for comparison independent_stat = test_file.stat() # Verify stat info matches (cached stat should be accurate) assert stat_info.st_size == independent_stat.st_size assert abs(stat_info.st_mtime - independent_stat.st_mtime) < 1 # Allow 1s tolerance assert abs(stat_info.st_ctime - independent_stat.st_ctime) < 1 break @pytest.mark.asyncio async def test_scan_directory_empty_directory( sync_service: SyncService, project_config: ProjectConfig ): """Test streaming scan on empty directory (ignoring hidden files).""" project_dir = project_config.home # Directory exists but has no user files (may have .basic-memory config dir) assert project_dir.exists() # Don't create any user files - just scan empty directory # Scan should yield no results (hidden files are ignored by default) results = [] async for file_path, stat_info in sync_service.scan_directory(project_dir): results.append(file_path) # Should find no files (config dirs are hidden and ignored) assert len(results) == 0 @pytest.mark.asyncio async def test_scan_directory_handles_permission_error( sync_service: SyncService, project_config: ProjectConfig ): """Test that streaming scan handles permission errors gracefully.""" import sys # Skip on Windows - permission handling is different if sys.platform == "win32": pytest.skip("Permission tests not reliable on Windows") project_dir = project_config.home # Create accessible file await create_test_file(project_dir / "accessible.md", "accessible") # Create restricted directory restricted_dir = project_dir / "restricted" restricted_dir.mkdir() await create_test_file(restricted_dir / "secret.md", "secret") # Remove read permission from restricted directory restricted_dir.chmod(0o000) try: # Scan should handle permission error and continue results = [] async for file_path, stat_info in sync_service.scan_directory(project_dir): rel_path = Path(file_path).relative_to(project_dir).as_posix() results.append(rel_path) # Should have found accessible file but not restricted one assert "accessible.md" in results assert "restricted/secret.md" not in results finally: # Restore permissions for cleanup restricted_dir.chmod(0o755) @pytest.mark.asyncio async def test_scan_directory_non_markdown_files( sync_service: SyncService, project_config: ProjectConfig ): """Test that streaming scan finds all file types, not just markdown.""" project_dir = project_config.home # Create various file types await create_test_file(project_dir / "doc.md", "markdown") (project_dir / "image.png").write_bytes(b"PNG content") (project_dir / "data.json").write_text('{"key": "value"}') (project_dir / "script.py").write_text("print('hello')") # Collect results results = [] async for file_path, stat_info in sync_service.scan_directory(project_dir): rel_path = Path(file_path).relative_to(project_dir).as_posix() results.append(rel_path) # All files should be found assert "doc.md" in results assert "image.png" in results assert "data.json" in results assert "script.py" in results @pytest.mark.asyncio async def test_file_service_checksum_correctness( sync_service: SyncService, project_config: ProjectConfig ): """Test that FileService computes correct checksums.""" import hashlib project_dir = project_config.home # Test small markdown file small_content = "Test content for checksum validation" * 10 small_file = project_dir / "small.md" await create_test_file(small_file, small_content) rel_path = small_file.relative_to(project_dir).as_posix() checksum = await sync_service.file_service.compute_checksum(rel_path) # Verify checksum is correct expected = hashlib.sha256(small_content.encode("utf-8")).hexdigest() assert checksum == expected assert len(checksum) == 64 # SHA256 hex digest length

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/basicmachines-co/basic-memory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server