test_sync_service.py•66.4 kB
"""Test general sync behavior."""
import asyncio
import os
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
import pytest
from basic_memory.config import ProjectConfig, BasicMemoryConfig
from basic_memory.models import Entity
from basic_memory.repository import EntityRepository
from basic_memory.schemas.search import SearchQuery
from basic_memory.services import EntityService, FileService
from basic_memory.services.search_service import SearchService
from basic_memory.sync.sync_service import SyncService
async def create_test_file(path: Path, content: str = "test content") -> None:
"""Create a test file with given content."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
async def touch_file(path: Path) -> None:
"""Touch a file to update its mtime (for watermark testing)."""
import time
# Read and rewrite to update mtime
content = path.read_text()
time.sleep(0.5) # Ensure mtime changes and is newer than watermark (500ms)
path.write_text(content)
async def force_full_scan(sync_service: SyncService) -> None:
"""Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
if sync_service.entity_repository.project_id is not None:
project = await sync_service.project_repository.find_by_id(
sync_service.entity_repository.project_id
)
if project:
await sync_service.project_repository.update(
project.id,
{
"last_scan_timestamp": None,
"last_file_count": None,
},
)
@pytest.mark.asyncio
async def test_forward_reference_resolution(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that forward references get resolved when target file is created."""
project_dir = project_config.home
# First create a file with a forward reference
source_content = """
---
type: knowledge
---
# Source Document
## Relations
- depends_on [[target-doc]]
- depends_on [[target-doc]] # duplicate
"""
await create_test_file(project_dir / "source.md", source_content)
# Initial sync - should create forward reference
await sync_service.sync(project_config.home)
# Verify forward reference
source = await entity_service.get_by_permalink("source")
assert len(source.relations) == 1
assert source.relations[0].to_id is None
assert source.relations[0].to_name == "target-doc"
# Now create the target file
target_content = """
---
type: knowledge
---
# Target Doc
Target content
"""
target_file = project_dir / "target_doc.md"
await create_test_file(target_file, target_content)
# Force full scan to ensure the new file is detected
# Incremental scans have timing precision issues with watermarks on some filesystems
await force_full_scan(sync_service)
# Sync again - should resolve the reference
await sync_service.sync(project_config.home)
# Verify reference is now resolved
source = await entity_service.get_by_permalink("source")
target = await entity_service.get_by_permalink("target-doc")
assert len(source.relations) == 1
assert source.relations[0].to_id == target.id
assert source.relations[0].to_name == target.title
@pytest.mark.asyncio
async def test_sync(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test basic knowledge sync functionality."""
# Create test files
project_dir = project_config.home
# New entity with relation
new_content = """
---
type: knowledge
permalink: concept/test-concept
created: 2023-01-01
modified: 2023-01-01
---
# Test Concept
A test concept.
## Observations
- [design] Core feature
## Relations
- depends_on [[concept/other]]
"""
await create_test_file(project_dir / "concept/test_concept.md", new_content)
# Create related entity in DB that will be deleted
# because file was not found
other = Entity(
permalink="concept/other",
title="Other",
entity_type="test",
file_path="concept/other.md",
checksum="12345678",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
await entity_service.repository.add(other)
# Run sync
await sync_service.sync(project_config.home)
# Verify results
entities = await entity_service.repository.find_all()
assert len(entities) == 1
# Find new entity
test_concept = next(e for e in entities if e.permalink == "concept/test-concept")
assert test_concept.entity_type == "knowledge"
# Verify relation was created
# with forward link
entity = await entity_service.get_by_permalink(test_concept.permalink)
relations = entity.relations
assert len(relations) == 1, "Expected 1 relation for entity"
assert relations[0].to_name == "concept/other"
@pytest.mark.asyncio
async def test_sync_hidden_file(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test basic knowledge sync functionality."""
# Create test files
project_dir = project_config.home
# hidden file
await create_test_file(project_dir / "concept/.hidden.md", "hidden")
# Run sync
await sync_service.sync(project_config.home)
# Verify results
entities = await entity_service.repository.find_all()
assert len(entities) == 0
@pytest.mark.asyncio
async def test_sync_entity_with_nonexistent_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test syncing an entity that references nonexistent entities."""
project_dir = project_config.home
# Create entity that references entities we haven't created yet
content = """
---
type: knowledge
permalink: concept/depends-on-future
created: 2024-01-01
modified: 2024-01-01
---
# Test Dependencies
## Observations
- [design] Testing future dependencies
## Relations
- depends_on [[concept/not_created_yet]]
- uses [[concept/also_future]]
"""
await create_test_file(project_dir / "concept/depends_on_future.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify entity created but no relations
entity = await sync_service.entity_service.repository.get_by_permalink(
"concept/depends-on-future"
)
assert entity is not None
assert len(entity.relations) == 2
assert entity.relations[0].to_name == "concept/not_created_yet"
assert entity.relations[1].to_name == "concept/also_future"
@pytest.mark.asyncio
async def test_sync_entity_circular_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test syncing entities with circular dependencies."""
project_dir = project_config.home
# Create entity A that depends on B
content_a = """
---
type: knowledge
permalink: concept/entity-a
created: 2024-01-01
modified: 2024-01-01
---
# Entity A
## Observations
- First entity in circular reference
## Relations
- depends_on [[concept/entity-b]]
"""
await create_test_file(project_dir / "concept/entity_a.md", content_a)
# Create entity B that depends on A
content_b = """
---
type: knowledge
permalink: concept/entity-b
created: 2024-01-01
modified: 2024-01-01
---
# Entity B
## Observations
- Second entity in circular reference
## Relations
- depends_on [[concept/entity-a]]
"""
await create_test_file(project_dir / "concept/entity_b.md", content_b)
# Sync
await sync_service.sync(project_config.home)
# Verify both entities and their relations
entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a")
entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b")
# outgoing relations
assert len(entity_a.outgoing_relations) == 1
assert len(entity_b.outgoing_relations) == 1
# incoming relations
assert len(entity_a.incoming_relations) == 1
assert len(entity_b.incoming_relations) == 1
# all relations
assert len(entity_a.relations) == 2
assert len(entity_b.relations) == 2
# Verify circular reference works
a_relation = entity_a.outgoing_relations[0]
assert a_relation.to_id == entity_b.id
b_relation = entity_b.outgoing_relations[0]
assert b_relation.to_id == entity_a.id
@pytest.mark.asyncio
async def test_sync_entity_duplicate_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling of duplicate relations in an entity."""
project_dir = project_config.home
# Create target entity first
target_content = """
---
type: knowledge
permalink: concept/target
created: 2024-01-01
modified: 2024-01-01
---
# Target Entity
## Observations
- something to observe
"""
await create_test_file(project_dir / "concept/target.md", target_content)
# Create entity with duplicate relations
content = """
---
type: knowledge
permalink: concept/duplicate-relations
created: 2024-01-01
modified: 2024-01-01
---
# Test Duplicates
## Observations
- this has a lot of relations
## Relations
- depends_on [[concept/target]]
- depends_on [[concept/target]] # Duplicate
- uses [[concept/target]] # Different relation type
- uses [[concept/target]] # Duplicate of different type
"""
await create_test_file(project_dir / "concept/duplicate_relations.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify duplicates are handled
entity = await sync_service.entity_service.repository.get_by_permalink(
"concept/duplicate-relations"
)
# Count relations by type
relation_counts = {}
for rel in entity.relations:
relation_counts[rel.relation_type] = relation_counts.get(rel.relation_type, 0) + 1
# Should only have one of each type
assert relation_counts["depends_on"] == 1
assert relation_counts["uses"] == 1
@pytest.mark.asyncio
async def test_sync_entity_with_random_categories(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling of random observation categories."""
project_dir = project_config.home
content = """
---
type: knowledge
permalink: concept/invalid-category
created: 2024-01-01
modified: 2024-01-01
---
# Test Categories
## Observations
- [random category] This is fine
- [ a space category] Should default to note
- This one is not an observation, should be ignored
- [design] This is valid
"""
await create_test_file(project_dir / "concept/invalid_category.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify observations
entity = await sync_service.entity_service.repository.get_by_permalink(
"concept/invalid-category"
)
assert len(entity.observations) == 3
categories = [obs.category for obs in entity.observations]
# Invalid categories should be converted to default
assert "random category" in categories
# Valid categories preserved
assert "a space category" in categories
assert "design" in categories
@pytest.mark.skip("sometimes fails")
@pytest.mark.asyncio
async def test_sync_entity_with_order_dependent_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that order of entity syncing doesn't affect relation creation."""
project_dir = project_config.home
# Create several interrelated entities
entities = {
"a": """
---
type: knowledge
permalink: concept/entity-a
created: 2024-01-01
modified: 2024-01-01
---
# Entity A
## Observations
- depends on b
- depends on c
## Relations
- depends_on [[concept/entity-b]]
- depends_on [[concept/entity-c]]
""",
"b": """
---
type: knowledge
permalink: concept/entity-b
created: 2024-01-01
modified: 2024-01-01
---
# Entity B
## Observations
- depends on c
## Relations
- depends_on [[concept/entity-c]]
""",
"c": """
---
type: knowledge
permalink: concept/entity-c
created: 2024-01-01
modified: 2024-01-01
---
# Entity C
## Observations
- depends on a
## Relations
- depends_on [[concept/entity-a]]
""",
}
# Create files in different orders and verify results are the same
for name, content in entities.items():
await create_test_file(project_dir / f"concept/entity_{name}.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify all relations are created correctly regardless of order
entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a")
entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b")
entity_c = await sync_service.entity_service.repository.get_by_permalink("concept/entity-c")
# Verify outgoing relations by checking actual targets
a_outgoing_targets = {rel.to_id for rel in entity_a.outgoing_relations}
assert entity_b.id in a_outgoing_targets, (
f"A should depend on B. A's targets: {a_outgoing_targets}, B's ID: {entity_b.id}"
)
assert entity_c.id in a_outgoing_targets, (
f"A should depend on C. A's targets: {a_outgoing_targets}, C's ID: {entity_c.id}"
)
assert len(entity_a.outgoing_relations) == 2, "A should have exactly 2 outgoing relations"
b_outgoing_targets = {rel.to_id for rel in entity_b.outgoing_relations}
assert entity_c.id in b_outgoing_targets, "B should depend on C"
assert len(entity_b.outgoing_relations) == 1, "B should have exactly 1 outgoing relation"
c_outgoing_targets = {rel.to_id for rel in entity_c.outgoing_relations}
assert entity_a.id in c_outgoing_targets, "C should depend on A"
assert len(entity_c.outgoing_relations) == 1, "C should have exactly 1 outgoing relation"
# Verify incoming relations by checking actual sources
a_incoming_sources = {rel.from_id for rel in entity_a.incoming_relations}
assert entity_c.id in a_incoming_sources, "A should have incoming relation from C"
b_incoming_sources = {rel.from_id for rel in entity_b.incoming_relations}
assert entity_a.id in b_incoming_sources, "B should have incoming relation from A"
c_incoming_sources = {rel.from_id for rel in entity_c.incoming_relations}
assert entity_a.id in c_incoming_sources, "C should have incoming relation from A"
assert entity_b.id in c_incoming_sources, "C should have incoming relation from B"
@pytest.mark.asyncio
async def test_sync_empty_directories(sync_service: SyncService, project_config: ProjectConfig):
"""Test syncing empty directories."""
await sync_service.sync(project_config.home)
# Should not raise exceptions for empty dirs
assert project_config.home.exists()
@pytest.mark.skip("flaky on Windows due to filesystem timing precision")
@pytest.mark.asyncio
async def test_sync_file_modified_during_sync(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling of files that change during sync process."""
# Create initial files
doc_path = project_config.home / "changing.md"
await create_test_file(
doc_path,
"""
---
type: knowledge
id: changing
created: 2024-01-01
modified: 2024-01-01
---
# Knowledge File
## Observations
- This is a test
""",
)
# Setup async modification during sync
async def modify_file():
await asyncio.sleep(0.1) # Small delay to ensure sync has started
doc_path.write_text("Modified during sync")
# Run sync and modification concurrently
await asyncio.gather(sync_service.sync(project_config.home), modify_file())
# Verify final state
doc = await sync_service.entity_service.repository.get_by_permalink("changing")
assert doc is not None
# if we failed in the middle of a sync, the next one should fix it.
if doc.checksum is None:
await sync_service.sync(project_config.home)
doc = await sync_service.entity_service.repository.get_by_permalink("changing")
assert doc.checksum is not None
@pytest.mark.asyncio
async def test_permalink_formatting(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test that permalinks are properly formatted during sync."""
# Test cases with different filename formats
test_files = {
# filename -> expected permalink
"my_awesome_feature.md": "my-awesome-feature",
"MIXED_CASE_NAME.md": "mixed-case-name",
"spaces and_underscores.md": "spaces-and-underscores",
"design/model_refactor.md": "design/model-refactor",
"test/multiple_word_directory/feature_name.md": "test/multiple-word-directory/feature-name",
}
# Create test files
content: str = """
---
type: knowledge
created: 2024-01-01
modified: 2024-01-01
---
# Test File
Testing permalink generation.
"""
for filename, _ in test_files.items():
await create_test_file(project_config.home / filename, content)
# Run sync once after all files are created
await sync_service.sync(project_config.home)
# Verify permalinks
entities = await entity_service.repository.find_all()
for filename, expected_permalink in test_files.items():
# Find entity for this file
entity = next(e for e in entities if e.file_path == filename)
assert entity.permalink == expected_permalink, (
f"File {filename} should have permalink {expected_permalink}"
)
@pytest.mark.asyncio
async def test_handle_entity_deletion(
test_graph,
sync_service: SyncService,
entity_repository: EntityRepository,
search_service: SearchService,
):
"""Test deletion of entity cleans up search index."""
root_entity = test_graph["root"]
# Delete the entity
await sync_service.handle_delete(root_entity.file_path)
# Verify entity is gone from db
assert await entity_repository.get_by_permalink(root_entity.permalink) is None
# Verify entity is gone from search index
entity_results = await search_service.search(SearchQuery(text=root_entity.title))
assert len(entity_results) == 0
obs_results = await search_service.search(SearchQuery(text="Root note 1"))
assert len(obs_results) == 0
rel_results = await search_service.search(SearchQuery(text="connects_to"))
assert len(rel_results) == 0
@pytest.mark.asyncio
async def test_sync_preserves_timestamps(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that sync preserves file timestamps and frontmatter dates."""
project_dir = project_config.home
# Create a file with explicit frontmatter dates
frontmatter_content = """
---
type: knowledge
---
# Explicit Dates
Testing frontmatter dates
"""
await create_test_file(project_dir / "explicit_dates.md", frontmatter_content)
# Create a file without dates (will use file timestamps)
file_dates_content = """
---
type: knowledge
---
# File Dates
Testing file timestamps
"""
file_path = project_dir / "file_dates3.md"
await create_test_file(file_path, file_dates_content)
# Run sync
await sync_service.sync(project_config.home)
# Check explicit frontmatter dates
explicit_entity = await entity_service.get_by_permalink("explicit-dates")
assert explicit_entity.created_at is not None
assert explicit_entity.updated_at is not None
# Check file timestamps
file_entity = await entity_service.get_by_permalink("file-dates3")
file_stats = file_path.stat()
# Compare using epoch timestamps to handle timezone differences correctly
# This ensures we're comparing the actual points in time, not display representations
entity_created_epoch = file_entity.created_at.timestamp()
entity_updated_epoch = file_entity.updated_at.timestamp()
# Allow 2s difference on Windows due to filesystem timing precision
tolerance = 2 if os.name == "nt" else 1
assert abs(entity_created_epoch - file_stats.st_ctime) < tolerance
assert abs(entity_updated_epoch - file_stats.st_mtime) < tolerance # Allow tolerance difference
@pytest.mark.asyncio
async def test_sync_updates_timestamps_on_file_modification(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that sync updates entity timestamps when files are modified.
This test specifically validates that when an existing file is modified and re-synced,
the updated_at timestamp in the database reflects the file's actual modification time,
not the database operation time. This is critical for accurate temporal ordering in
search and recent_activity queries.
"""
project_dir = project_config.home
# Create initial file
initial_content = """
---
type: knowledge
---
# Test File
Initial content for timestamp test
"""
file_path = project_dir / "timestamp_test.md"
await create_test_file(file_path, initial_content)
# Initial sync
await sync_service.sync(project_config.home)
# Get initial entity and timestamps
entity_before = await entity_service.get_by_permalink("timestamp-test")
initial_updated_at = entity_before.updated_at
# Modify the file content and update mtime to be newer than watermark
modified_content = """
---
type: knowledge
---
# Test File
Modified content for timestamp test
## Observations
- [test] This was modified
"""
file_path.write_text(modified_content)
# Touch file to ensure mtime is newer than watermark
# This uses our helper which sleeps 500ms and rewrites to guarantee mtime change
await touch_file(file_path)
# Get the file's modification time after our changes
file_stats_after_modification = file_path.stat()
# Force full scan to ensure the modified file is detected
# (incremental scans have timing precision issues with watermarks on some filesystems)
await force_full_scan(sync_service)
# Re-sync the modified file
await sync_service.sync(project_config.home)
# Get entity after re-sync
entity_after = await entity_service.get_by_permalink("timestamp-test")
# Verify that updated_at changed
assert entity_after.updated_at != initial_updated_at, (
"updated_at should change when file is modified"
)
# Verify that updated_at matches the file's modification time, not db operation time
entity_updated_epoch = entity_after.updated_at.timestamp()
file_mtime = file_stats_after_modification.st_mtime
# Allow 2s difference on Windows due to filesystem timing precision
tolerance = 2 if os.name == "nt" else 1
assert abs(entity_updated_epoch - file_mtime) < tolerance, (
f"Entity updated_at ({entity_after.updated_at}) should match file mtime "
f"({datetime.fromtimestamp(file_mtime)}) within {tolerance}s tolerance"
)
# Verify the content was actually updated
assert len(entity_after.observations) == 1
assert entity_after.observations[0].content == "This was modified"
@pytest.mark.asyncio
async def test_file_move_updates_search_index(
sync_service: SyncService,
project_config: ProjectConfig,
search_service: SearchService,
):
"""Test that moving a file updates its path in the search index."""
project_dir = project_config.home
# Create initial file
content = """
---
type: knowledge
---
# Test Move
Content for move test
"""
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True)
await create_test_file(old_path, content)
# Initial sync
await sync_service.sync(project_config.home)
# Move the file
new_path = project_dir / "new" / "moved_file.md"
new_path.parent.mkdir(parents=True)
old_path.rename(new_path)
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
# Second sync should detect the move
await sync_service.sync(project_config.home)
# Check search index has updated path
results = await search_service.search(SearchQuery(text="Content for move test"))
assert len(results) == 1
assert results[0].file_path == new_path.relative_to(project_dir).as_posix()
@pytest.mark.asyncio
async def test_sync_null_checksum_cleanup(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test handling of entities with null checksums from incomplete syncs."""
# Create entity with null checksum (simulating incomplete sync)
entity = Entity(
permalink="concept/incomplete",
title="Incomplete",
entity_type="test",
file_path="concept/incomplete.md",
checksum=None, # Null checksum
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
await entity_service.repository.add(entity)
# Create corresponding file
content = """
---
type: knowledge
id: concept/incomplete
created: 2024-01-01
modified: 2024-01-01
---
# Incomplete Entity
## Observations
- Testing cleanup
"""
await create_test_file(project_config.home / "concept/incomplete.md", content)
# Run sync
await sync_service.sync(project_config.home)
# Verify entity was properly synced
updated = await entity_service.get_by_permalink("concept/incomplete")
assert updated.checksum is not None
@pytest.mark.asyncio
async def test_sync_permalink_resolved(
sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, app_config
):
"""Test that we resolve duplicate permalinks on sync ."""
project_dir = project_config.home
# Create initial file
content = """
---
type: knowledge
---
# Test Move
Content for move test
"""
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True)
await create_test_file(old_path, content)
# Initial sync
await sync_service.sync(project_config.home)
# Move the file
new_path = project_dir / "new" / "moved_file.md"
new_path.parent.mkdir(parents=True)
old_path.rename(new_path)
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
# Sync again
await sync_service.sync(project_config.home)
file_content, _ = await file_service.read_file(new_path)
assert "permalink: new/moved-file" in file_content
# Create another that has the same permalink
content = """
---
type: knowledge
permalink: new/moved-file
---
# Test Move
Content for move test
"""
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True, exist_ok=True)
await create_test_file(old_path, content)
# Force full scan to detect the new file
# (file just created may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Sync new file
await sync_service.sync(project_config.home)
# assert permalink is unique
file_content, _ = await file_service.read_file(old_path)
assert "permalink: new/moved-file-1" in file_content
@pytest.mark.asyncio
async def test_sync_permalink_resolved_on_update(
sync_service: SyncService,
project_config: ProjectConfig,
file_service: FileService,
):
"""Test that sync resolves permalink conflicts on update."""
project_dir = project_config.home
one_file = project_dir / "one.md"
two_file = project_dir / "two.md"
await create_test_file(
one_file,
content=dedent(
"""
---
permalink: one
---
test content
"""
),
)
await create_test_file(
two_file,
content=dedent(
"""
---
permalink: two
---
test content
"""
),
)
# Run sync
await sync_service.sync(project_config.home)
# Check permalinks
file_one_content, _ = await file_service.read_file(one_file)
assert "permalink: one" in file_one_content
file_two_content, _ = await file_service.read_file(two_file)
assert "permalink: two" in file_two_content
# update the second file with a duplicate permalink
updated_content = """
---
title: two.md
type: note
permalink: one
tags: []
---
test content
"""
two_file.write_text(updated_content)
# Force full scan to detect the modified file
# (file just modified may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Run sync
await sync_service.sync(project_config.home)
# Check permalinks
file_two_content, _ = await file_service.read_file(two_file)
assert "permalink: two" in file_two_content
# new content with duplicate permalink
new_content = """
---
title: new.md
type: note
permalink: one
tags: []
---
test content
"""
new_file = project_dir / "new.md"
await create_test_file(new_file, new_content)
# Force full scan to detect the new file
# (file just created may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Run another time
await sync_service.sync(project_config.home)
# Should have deduplicated permalink
new_file_content, _ = await file_service.read_file(new_file)
assert "permalink: one-1" in new_file_content
@pytest.mark.asyncio
async def test_sync_permalink_not_created_if_no_frontmatter(
sync_service: SyncService,
project_config: ProjectConfig,
file_service: FileService,
):
"""Test that sync resolves permalink conflicts on update."""
project_dir = project_config.home
file = project_dir / "one.md"
await create_test_file(file)
# Run sync
await sync_service.sync(project_config.home)
# Check permalink not created
file_content, _ = await file_service.read_file(file)
assert "permalink:" not in file_content
@pytest.fixture
def test_config_update_permamlinks_on_move(app_config) -> BasicMemoryConfig:
"""Test configuration using in-memory DB."""
app_config.update_permalinks_on_move = True
return app_config
@pytest.mark.asyncio
async def test_sync_permalink_updated_on_move(
test_config_update_permamlinks_on_move: BasicMemoryConfig,
project_config: ProjectConfig,
sync_service: SyncService,
file_service: FileService,
):
"""Test that we update a permalink on a file move if set in config ."""
project_dir = project_config.home
# Create initial file
content = dedent(
"""
---
type: knowledge
---
# Test Move
Content for move test
"""
)
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True)
await create_test_file(old_path, content)
# Initial sync
await sync_service.sync(project_config.home)
# verify permalink
old_content, _ = await file_service.read_file(old_path)
assert "permalink: old/test-move" in old_content
# Move the file
new_path = project_dir / "new" / "moved_file.md"
new_path.parent.mkdir(parents=True)
old_path.rename(new_path)
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
# Sync again
await sync_service.sync(project_config.home)
file_content, _ = await file_service.read_file(new_path)
assert "permalink: new/moved-file" in file_content
@pytest.mark.asyncio
async def test_sync_non_markdown_files(sync_service, project_config, test_files):
"""Test syncing non-markdown files."""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
# Verify entities were created
pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name))
assert pdf_entity is not None, "PDF entity should have been created"
assert pdf_entity.content_type == "application/pdf"
image_entity = await sync_service.entity_repository.get_by_file_path(
str(test_files["image"].name)
)
assert image_entity.content_type == "image/png"
@pytest.mark.asyncio
async def test_sync_non_markdown_files_modified(
sync_service, project_config, test_files, file_service
):
"""Test syncing non-markdown files."""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
test_files["pdf"].write_text("New content")
test_files["image"].write_text("New content")
# Force full scan to detect the modified files
# (files just modified may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
report = await sync_service.sync(project_config.home)
assert len(report.modified) == 2
pdf_file_content, pdf_checksum = await file_service.read_file(test_files["pdf"].name)
image_file_content, img_checksum = await file_service.read_file(test_files["image"].name)
pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name))
image_entity = await sync_service.entity_repository.get_by_file_path(
str(test_files["image"].name)
)
assert pdf_entity.checksum == pdf_checksum
assert image_entity.checksum == img_checksum
@pytest.mark.asyncio
async def test_sync_non_markdown_files_move(sync_service, project_config, test_files):
"""Test syncing non-markdown files updates permalink"""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
test_files["pdf"].rename(project_config.home / "moved_pdf.pdf")
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
report2 = await sync_service.sync(project_config.home)
assert len(report2.moves) == 1
# Verify entity is updated
pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf")
assert pdf_entity is not None
assert pdf_entity.permalink is None
@pytest.mark.asyncio
async def test_sync_non_markdown_files_deleted(sync_service, project_config, test_files):
"""Test syncing non-markdown files updates permalink"""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
test_files["pdf"].unlink()
report2 = await sync_service.sync(project_config.home)
assert len(report2.deleted) == 1
# Verify entity is deleted
pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf")
assert pdf_entity is None
@pytest.mark.asyncio
async def test_sync_non_markdown_files_move_with_delete(
sync_service, project_config, test_files, file_service
):
"""Test syncing non-markdown files handles file deletes and renames during sync"""
# Create initial files
await create_test_file(project_config.home / "doc.pdf", "content1")
await create_test_file(project_config.home / "other/doc-1.pdf", "content2")
# Initial sync
await sync_service.sync(project_config.home)
# First move/delete the original file to make way for the move
(project_config.home / "doc.pdf").unlink()
(project_config.home / "other/doc-1.pdf").rename(project_config.home / "doc.pdf")
# Sync again
await sync_service.sync(project_config.home)
# Verify the changes
moved_entity = await sync_service.entity_repository.get_by_file_path("doc.pdf")
assert moved_entity is not None
assert moved_entity.permalink is None
file_content, _ = await file_service.read_file("doc.pdf")
assert "content2" in file_content
@pytest.mark.asyncio
async def test_sync_relation_to_non_markdown_file(
sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, test_files
):
"""Test that sync resolves permalink conflicts on update."""
project_dir = project_config.home
content = f"""
---
title: a note
type: note
tags: []
---
- relates_to [[{test_files["pdf"].name}]]
"""
note_file = project_dir / "note.md"
await create_test_file(note_file, content)
# Run sync
await sync_service.sync(project_config.home)
# Check permalinks
file_one_content, _ = await file_service.read_file(note_file)
assert (
f"""---
title: a note
type: note
tags: []
permalink: note
---
- relates_to [[{test_files["pdf"].name}]]
""".strip()
== file_one_content
)
@pytest.mark.asyncio
async def test_sync_regular_file_race_condition_handling(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that sync_regular_file handles race condition with IntegrityError (lines 380-401)."""
from unittest.mock import patch
from sqlalchemy.exc import IntegrityError
from datetime import datetime, timezone
# Create a test file
test_file = project_config.home / "test_race.md"
test_content = """
---
type: knowledge
---
# Test Race Condition
This is a test file for race condition handling.
"""
await create_test_file(test_file, test_content)
# Mock the entity_repository.add to raise IntegrityError on first call
original_add = sync_service.entity_repository.add
call_count = 0
async def mock_add(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
# Simulate race condition - another process created the entity
raise IntegrityError("UNIQUE constraint failed: entity.file_path", None, None) # pyright: ignore [reportArgumentType]
else:
return await original_add(*args, **kwargs)
# Mock get_by_file_path to return an existing entity (simulating the race condition result)
async def mock_get_by_file_path(file_path):
from basic_memory.models import Entity
return Entity(
id=1,
title="Test Race Condition",
entity_type="knowledge",
file_path=str(file_path),
permalink="test-race-condition",
content_type="text/markdown",
checksum="old_checksum",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
# Mock update to return the updated entity
async def mock_update(entity_id, updates):
from basic_memory.models import Entity
return Entity(
id=entity_id,
title="Test Race Condition",
entity_type="knowledge",
file_path=updates["file_path"],
permalink="test-race-condition",
content_type="text/markdown",
checksum=updates["checksum"],
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
with (
patch.object(sync_service.entity_repository, "add", side_effect=mock_add),
patch.object(
sync_service.entity_repository, "get_by_file_path", side_effect=mock_get_by_file_path
) as mock_get,
patch.object(
sync_service.entity_repository, "update", side_effect=mock_update
) as mock_update_call,
):
# Call sync_regular_file
entity, checksum = await sync_service.sync_regular_file(
str(test_file.relative_to(project_config.home)), new=True
)
# Verify it handled the race condition gracefully
assert entity is not None
assert entity.title == "Test Race Condition"
assert entity.file_path == str(test_file.relative_to(project_config.home))
# Verify that get_by_file_path and update were called as fallback
assert mock_get.call_count >= 1 # May be called multiple times
mock_update_call.assert_called_once()
@pytest.mark.asyncio
async def test_sync_regular_file_integrity_error_reraise(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that sync_regular_file re-raises IntegrityError for non-race-condition cases."""
from unittest.mock import patch
from sqlalchemy.exc import IntegrityError
# Create a test file
test_file = project_config.home / "test_integrity.md"
test_content = """
---
type: knowledge
---
# Test Integrity Error
This is a test file for integrity error handling.
"""
await create_test_file(test_file, test_content)
# Mock the entity_repository.add to raise a different IntegrityError (not file_path constraint)
async def mock_add(*args, **kwargs):
# Simulate a different constraint violation
raise IntegrityError("UNIQUE constraint failed: entity.some_other_field", None, None) # pyright: ignore [reportArgumentType]
with patch.object(sync_service.entity_repository, "add", side_effect=mock_add):
# Should re-raise the IntegrityError since it's not a file_path constraint
with pytest.raises(
IntegrityError, match="UNIQUE constraint failed: entity.some_other_field"
):
await sync_service.sync_regular_file(
str(test_file.relative_to(project_config.home)), new=True
)
@pytest.mark.asyncio
async def test_sync_regular_file_race_condition_entity_not_found(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling when entity is not found after IntegrityError (pragma: no cover case)."""
from unittest.mock import patch
from sqlalchemy.exc import IntegrityError
# Create a test file
test_file = project_config.home / "test_not_found.md"
test_content = """
---
type: knowledge
---
# Test Not Found
This is a test file for entity not found after constraint violation.
"""
await create_test_file(test_file, test_content)
# Mock the entity_repository.add to raise IntegrityError
async def mock_add(*args, **kwargs):
raise IntegrityError("UNIQUE constraint failed: entity.file_path", None, None) # pyright: ignore [reportArgumentType]
# Mock get_by_file_path to return None (entity not found)
async def mock_get_by_file_path(file_path):
return None
with (
patch.object(sync_service.entity_repository, "add", side_effect=mock_add),
patch.object(
sync_service.entity_repository, "get_by_file_path", side_effect=mock_get_by_file_path
),
):
# Should raise ValueError when entity is not found after constraint violation
with pytest.raises(ValueError, match="Entity not found after constraint violation"):
await sync_service.sync_regular_file(
str(test_file.relative_to(project_config.home)), new=True
)
@pytest.mark.asyncio
async def test_sync_regular_file_race_condition_update_failed(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling when update fails after IntegrityError (pragma: no cover case)."""
from unittest.mock import patch
from sqlalchemy.exc import IntegrityError
from datetime import datetime, timezone
# Create a test file
test_file = project_config.home / "test_update_fail.md"
test_content = """
---
type: knowledge
---
# Test Update Fail
This is a test file for update failure after constraint violation.
"""
await create_test_file(test_file, test_content)
# Mock the entity_repository.add to raise IntegrityError
async def mock_add(*args, **kwargs):
raise IntegrityError("UNIQUE constraint failed: entity.file_path", None, None) # pyright: ignore [reportArgumentType]
# Mock get_by_file_path to return an existing entity
async def mock_get_by_file_path(file_path):
from basic_memory.models import Entity
return Entity(
id=1,
title="Test Update Fail",
entity_type="knowledge",
file_path=str(file_path),
permalink="test-update-fail",
content_type="text/markdown",
checksum="old_checksum",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
# Mock update to return None (failure)
async def mock_update(entity_id, updates):
return None
with (
patch.object(sync_service.entity_repository, "add", side_effect=mock_add),
patch.object(
sync_service.entity_repository, "get_by_file_path", side_effect=mock_get_by_file_path
),
patch.object(sync_service.entity_repository, "update", side_effect=mock_update),
):
# Should raise ValueError when update fails
with pytest.raises(ValueError, match="Failed to update entity with ID"):
await sync_service.sync_regular_file(
str(test_file.relative_to(project_config.home)), new=True
)
@pytest.mark.asyncio
async def test_circuit_breaker_skips_after_three_failures(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that circuit breaker skips file after 3 consecutive failures."""
from unittest.mock import patch
project_dir = project_config.home
test_file = project_dir / "failing_file.md"
# Create a file with malformed content that will fail to parse
await create_test_file(test_file, "invalid markdown content")
# Mock sync_markdown_file to always fail
async def mock_sync_markdown_file(*args, **kwargs):
raise ValueError("Simulated sync failure")
with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file):
# First sync - should fail and record (1/3)
report1 = await sync_service.sync(project_dir)
assert len(report1.skipped_files) == 0 # Not skipped yet
# Touch file to trigger incremental scan
await touch_file(test_file)
# Force full scan to ensure file is detected
# (touch may not update mtime sufficiently on all filesystems)
await force_full_scan(sync_service)
# Second sync - should fail and record (2/3)
report2 = await sync_service.sync(project_dir)
assert len(report2.skipped_files) == 0 # Still not skipped
# Touch file to trigger incremental scan
await touch_file(test_file)
# Force full scan to ensure file is detected
# (touch may not update mtime sufficiently on all filesystems)
await force_full_scan(sync_service)
# Third sync - should fail, record (3/3), and be added to skipped list
report3 = await sync_service.sync(project_dir)
assert len(report3.skipped_files) == 1
assert report3.skipped_files[0].path == "failing_file.md"
assert report3.skipped_files[0].failure_count == 3
assert "Simulated sync failure" in report3.skipped_files[0].reason
# Touch file to trigger incremental scan
await touch_file(test_file)
# Force full scan to ensure file is detected
# (touch may not update mtime sufficiently on all filesystems)
await force_full_scan(sync_service)
# Fourth sync - should be skipped immediately without attempting
report4 = await sync_service.sync(project_dir)
assert len(report4.skipped_files) == 1 # Still skipped
@pytest.mark.asyncio
async def test_circuit_breaker_resets_on_file_change(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test that circuit breaker resets when file content changes."""
from unittest.mock import patch
project_dir = project_config.home
test_file = project_dir / "changing_file.md"
# Create initial failing content
await create_test_file(test_file, "initial bad content")
# Mock sync_markdown_file to fail
call_count = 0
async def mock_sync_markdown_file(*args, **kwargs):
nonlocal call_count
call_count += 1
raise ValueError("Simulated sync failure")
with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file):
# Fail 3 times to hit circuit breaker threshold
await sync_service.sync(project_dir) # Fail 1
await touch_file(test_file) # Touch to trigger incremental scan
# Force full scan to ensure file is detected
# (touch may not update mtime sufficiently on all filesystems)
await force_full_scan(sync_service)
await sync_service.sync(project_dir) # Fail 2
await touch_file(test_file) # Touch to trigger incremental scan
# Force full scan to ensure file is detected
# (touch may not update mtime sufficiently on all filesystems)
await force_full_scan(sync_service)
report3 = await sync_service.sync(project_dir) # Fail 3 - now skipped
assert len(report3.skipped_files) == 1
# Now change the file content
valid_content = dedent(
"""
---
title: Fixed Content
type: knowledge
---
# Fixed Content
This should work now.
"""
).strip()
await create_test_file(test_file, valid_content)
# Force full scan to detect the modified file
# (file just modified may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Circuit breaker should reset and allow retry
report = await sync_service.sync(project_dir)
assert len(report.skipped_files) == 0 # Should not be skipped anymore
# Verify entity was created successfully
entity = await entity_service.get_by_permalink("changing-file")
assert entity is not None
assert entity.title == "Fixed Content"
@pytest.mark.asyncio
async def test_circuit_breaker_clears_on_success(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test that circuit breaker clears failure history after successful sync."""
from unittest.mock import patch
project_dir = project_config.home
test_file = project_dir / "sometimes_failing.md"
valid_content = dedent(
"""
---
title: Test File
type: knowledge
---
# Test File
Test content
"""
).strip()
await create_test_file(test_file, valid_content)
# Mock to fail twice, then succeed
call_count = 0
original_sync_markdown_file = sync_service.sync_markdown_file
async def mock_sync_markdown_file(path, new):
nonlocal call_count
call_count += 1
if call_count <= 2:
raise ValueError("Temporary failure")
# On third call, use the real implementation
return await original_sync_markdown_file(path, new)
# Patch and fail twice
with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file):
await sync_service.sync(project_dir) # Fail 1
await touch_file(test_file) # Touch to trigger incremental scan
# Force full scan to ensure file is detected
# (touch may not update mtime sufficiently on all filesystems)
await force_full_scan(sync_service)
await sync_service.sync(project_dir) # Fail 2
await touch_file(test_file) # Touch to trigger incremental scan
# Force full scan to ensure file is detected
# (touch may not update mtime sufficiently on all filesystems)
await force_full_scan(sync_service)
await sync_service.sync(project_dir) # Succeed
# Verify failure history was cleared
assert "sometimes_failing.md" not in sync_service._file_failures
# Verify entity was created
entity = await entity_service.get_by_permalink("sometimes-failing")
assert entity is not None
@pytest.mark.asyncio
async def test_circuit_breaker_tracks_multiple_files(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that circuit breaker tracks multiple failing files independently."""
from unittest.mock import patch
project_dir = project_config.home
# Create multiple files with valid markdown
await create_test_file(
project_dir / "file1.md",
"""
---
type: knowledge
---
# File 1
Content 1
""",
)
await create_test_file(
project_dir / "file2.md",
"""
---
type: knowledge
---
# File 2
Content 2
""",
)
await create_test_file(
project_dir / "file3.md",
"""
---
type: knowledge
---
# File 3
Content 3
""",
)
# Mock to make file1 and file2 fail, but file3 succeed
original_sync_markdown_file = sync_service.sync_markdown_file
async def mock_sync_markdown_file(path, new):
if "file1.md" in path or "file2.md" in path:
raise ValueError(f"Failure for {path}")
# file3 succeeds - use real implementation
return await original_sync_markdown_file(path, new)
with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file):
# Fail 3 times for file1 and file2 (file3 succeeds each time)
await sync_service.sync(project_dir) # Fail count: file1=1, file2=1
await touch_file(project_dir / "file1.md") # Touch to trigger incremental scan
await touch_file(project_dir / "file2.md") # Touch to trigger incremental scan
await sync_service.sync(project_dir) # Fail count: file1=2, file2=2
await touch_file(project_dir / "file1.md") # Touch to trigger incremental scan
await touch_file(project_dir / "file2.md") # Touch to trigger incremental scan
report3 = await sync_service.sync(project_dir) # Fail count: file1=3, file2=3, now skipped
# Both files should be skipped on third sync
assert len(report3.skipped_files) == 2
skipped_paths = {f.path for f in report3.skipped_files}
assert "file1.md" in skipped_paths
assert "file2.md" in skipped_paths
# Verify file3 is not in failures dict
assert "file3.md" not in sync_service._file_failures
@pytest.mark.asyncio
async def test_circuit_breaker_handles_checksum_computation_failure(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test circuit breaker behavior when checksum computation fails."""
from unittest.mock import patch
project_dir = project_config.home
test_file = project_dir / "checksum_fail.md"
await create_test_file(test_file, "content")
# Mock sync_markdown_file to fail
async def mock_sync_markdown_file(*args, **kwargs):
raise ValueError("Sync failure")
# Mock checksum computation to fail only during _record_failure (not during scan)
original_compute_checksum = sync_service.file_service.compute_checksum
call_count = 0
async def mock_compute_checksum(path):
nonlocal call_count
call_count += 1
# First call is during scan - let it succeed
if call_count == 1:
return await original_compute_checksum(path)
# Second call is during _record_failure - make it fail
raise IOError("Cannot read file")
with (
patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file),
patch.object(
sync_service.file_service,
"compute_checksum",
side_effect=mock_compute_checksum,
),
):
# Should still record failure even if checksum fails
await sync_service.sync(project_dir)
# Check that failure was recorded with empty checksum
assert "checksum_fail.md" in sync_service._file_failures
failure_info = sync_service._file_failures["checksum_fail.md"]
assert failure_info.count == 1
assert failure_info.last_checksum == "" # Empty when checksum fails
@pytest.mark.asyncio
async def test_sync_fatal_error_terminates_sync_immediately(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test that SyncFatalError terminates sync immediately without circuit breaker retry.
This tests the fix for issue #188 where project deletion during sync should
terminate immediately rather than retrying each file 3 times.
"""
from unittest.mock import patch
from basic_memory.services.exceptions import SyncFatalError
project_dir = project_config.home
# Create multiple test files
await create_test_file(
project_dir / "file1.md",
dedent(
"""
---
type: knowledge
---
# File 1
Content 1
"""
),
)
await create_test_file(
project_dir / "file2.md",
dedent(
"""
---
type: knowledge
---
# File 2
Content 2
"""
),
)
await create_test_file(
project_dir / "file3.md",
dedent(
"""
---
type: knowledge
---
# File 3
Content 3
"""
),
)
# Mock entity_service.create_entity_from_markdown to raise SyncFatalError on first file
# This simulates project being deleted during sync
async def mock_create_entity_from_markdown(*args, **kwargs):
raise SyncFatalError(
"Cannot sync file 'file1.md': project_id=99999 does not exist in database. "
"The project may have been deleted. This sync will be terminated."
)
with patch.object(
entity_service, "create_entity_from_markdown", side_effect=mock_create_entity_from_markdown
):
# Sync should raise SyncFatalError and terminate immediately
with pytest.raises(SyncFatalError, match="project_id=99999 does not exist"):
await sync_service.sync(project_dir)
# Verify that circuit breaker did NOT record this as a file-level failure
# (SyncFatalError should bypass circuit breaker and re-raise immediately)
assert "file1.md" not in sync_service._file_failures
# Verify that no other files were attempted (sync terminated on first error)
# If circuit breaker was used, we'd see file1 in failures
# If sync continued, we'd see attempts for file2 and file3
@pytest.mark.asyncio
async def test_scan_directory_basic(sync_service: SyncService, project_config: ProjectConfig):
"""Test basic streaming directory scan functionality."""
project_dir = project_config.home
# Create test files in different directories
await create_test_file(project_dir / "root.md", "root content")
await create_test_file(project_dir / "subdir/file1.md", "file 1 content")
await create_test_file(project_dir / "subdir/file2.md", "file 2 content")
await create_test_file(project_dir / "subdir/nested/file3.md", "file 3 content")
# Collect results from streaming iterator
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append((rel_path, stat_info))
# Verify all files were found
file_paths = {rel_path for rel_path, _ in results}
assert "root.md" in file_paths
assert "subdir/file1.md" in file_paths
assert "subdir/file2.md" in file_paths
assert "subdir/nested/file3.md" in file_paths
assert len(file_paths) == 4
# Verify stat info is present for each file
for rel_path, stat_info in results:
assert stat_info is not None
assert stat_info.st_size > 0 # Files have content
assert stat_info.st_mtime > 0 # Have modification time
@pytest.mark.asyncio
async def test_scan_directory_respects_ignore_patterns(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan respects .gitignore patterns."""
project_dir = project_config.home
# Create .gitignore file in project (will be used along with .bmignore)
(project_dir / ".gitignore").write_text("*.ignored\n.hidden/\n")
# Reload ignore patterns using project's .gitignore
from basic_memory.ignore_utils import load_gitignore_patterns
sync_service._ignore_patterns = load_gitignore_patterns(project_dir)
# Create test files - some should be ignored
await create_test_file(project_dir / "included.md", "included")
await create_test_file(project_dir / "excluded.ignored", "excluded")
await create_test_file(project_dir / ".hidden/secret.md", "secret")
await create_test_file(project_dir / "subdir/file.md", "file")
# Collect results
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append(rel_path)
# Verify ignored files were not returned
assert "included.md" in results
assert "subdir/file.md" in results
assert "excluded.ignored" not in results
assert ".hidden/secret.md" not in results
assert ".bmignore" not in results # .bmignore itself should be ignored
@pytest.mark.asyncio
async def test_scan_directory_cached_stat_info(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan provides cached stat info (no redundant stat calls)."""
project_dir = project_config.home
# Create test file
test_file = project_dir / "test.md"
await create_test_file(test_file, "test content")
# Get stat info from streaming scan
async for file_path, stat_info in sync_service.scan_directory(project_dir):
if Path(file_path).name == "test.md":
# Get independent stat for comparison
independent_stat = test_file.stat()
# Verify stat info matches (cached stat should be accurate)
assert stat_info.st_size == independent_stat.st_size
assert abs(stat_info.st_mtime - independent_stat.st_mtime) < 1 # Allow 1s tolerance
assert abs(stat_info.st_ctime - independent_stat.st_ctime) < 1
break
@pytest.mark.asyncio
async def test_scan_directory_empty_directory(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test streaming scan on empty directory (ignoring hidden files)."""
project_dir = project_config.home
# Directory exists but has no user files (may have .basic-memory config dir)
assert project_dir.exists()
# Don't create any user files - just scan empty directory
# Scan should yield no results (hidden files are ignored by default)
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
results.append(file_path)
# Should find no files (config dirs are hidden and ignored)
assert len(results) == 0
@pytest.mark.asyncio
async def test_scan_directory_handles_permission_error(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan handles permission errors gracefully."""
import sys
# Skip on Windows - permission handling is different
if sys.platform == "win32":
pytest.skip("Permission tests not reliable on Windows")
project_dir = project_config.home
# Create accessible file
await create_test_file(project_dir / "accessible.md", "accessible")
# Create restricted directory
restricted_dir = project_dir / "restricted"
restricted_dir.mkdir()
await create_test_file(restricted_dir / "secret.md", "secret")
# Remove read permission from restricted directory
restricted_dir.chmod(0o000)
try:
# Scan should handle permission error and continue
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append(rel_path)
# Should have found accessible file but not restricted one
assert "accessible.md" in results
assert "restricted/secret.md" not in results
finally:
# Restore permissions for cleanup
restricted_dir.chmod(0o755)
@pytest.mark.asyncio
async def test_scan_directory_non_markdown_files(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan finds all file types, not just markdown."""
project_dir = project_config.home
# Create various file types
await create_test_file(project_dir / "doc.md", "markdown")
(project_dir / "image.png").write_bytes(b"PNG content")
(project_dir / "data.json").write_text('{"key": "value"}')
(project_dir / "script.py").write_text("print('hello')")
# Collect results
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append(rel_path)
# All files should be found
assert "doc.md" in results
assert "image.png" in results
assert "data.json" in results
assert "script.py" in results
@pytest.mark.asyncio
async def test_file_service_checksum_correctness(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that FileService computes correct checksums."""
import hashlib
project_dir = project_config.home
# Test small markdown file
small_content = "Test content for checksum validation" * 10
small_file = project_dir / "small.md"
await create_test_file(small_file, small_content)
rel_path = small_file.relative_to(project_dir).as_posix()
checksum = await sync_service.file_service.compute_checksum(rel_path)
# Verify checksum is correct
expected = hashlib.sha256(small_content.encode("utf-8")).hexdigest()
assert checksum == expected
assert len(checksum) == 64 # SHA256 hex digest length